IMPALA-3200: move bufferpool under runtime

It is arguably a subcomponent of the runtime system

Change-Id: I3507066fb5cb955f0872d31a4619bd8bb7d647cd
Reviewed-on: http://gerrit.cloudera.org:8080/5165
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ac44d6c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ac44d6c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ac44d6c3

Branch: refs/heads/master
Commit: ac44d6c3ef0f71452cc47f7a61659e1b939110c3
Parents: eb03186
Author: Tim Armstrong <[email protected]>
Authored: Mon Nov 21 11:34:09 2016 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Nov 22 07:31:34 2016 +0000

----------------------------------------------------------------------
 .clang-tidy                                     |   1 -
 be/CMakeLists.txt                               |   4 +-
 be/src/bufferpool/CMakeLists.txt                |  32 --
 be/src/bufferpool/buffer-allocator.cc           |  39 --
 be/src/bufferpool/buffer-allocator.h            |  48 --
 be/src/bufferpool/buffer-pool-counters.h        |  47 --
 be/src/bufferpool/buffer-pool-test.cc           | 554 -------------------
 be/src/bufferpool/buffer-pool.cc                | 439 ---------------
 be/src/bufferpool/buffer-pool.h                 | 426 --------------
 .../bufferpool/reservation-tracker-counters.h   |  41 --
 be/src/bufferpool/reservation-tracker-test.cc   | 378 -------------
 be/src/bufferpool/reservation-tracker.cc        | 306 ----------
 be/src/bufferpool/reservation-tracker.h         | 248 ---------
 be/src/runtime/CMakeLists.txt                   |   1 +
 be/src/runtime/bufferpool/CMakeLists.txt        |  32 ++
 be/src/runtime/bufferpool/buffer-allocator.cc   |  39 ++
 be/src/runtime/bufferpool/buffer-allocator.h    |  48 ++
 .../runtime/bufferpool/buffer-pool-counters.h   |  47 ++
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 554 +++++++++++++++++++
 be/src/runtime/bufferpool/buffer-pool.cc        | 439 +++++++++++++++
 be/src/runtime/bufferpool/buffer-pool.h         | 426 ++++++++++++++
 .../bufferpool/reservation-tracker-counters.h   |  41 ++
 .../bufferpool/reservation-tracker-test.cc      | 378 +++++++++++++
 .../runtime/bufferpool/reservation-tracker.cc   | 306 ++++++++++
 be/src/runtime/bufferpool/reservation-tracker.h | 248 +++++++++
 be/src/runtime/mem-tracker.cc                   |   2 +-
 26 files changed, 2561 insertions(+), 2563 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/.clang-tidy
----------------------------------------------------------------------
diff --git a/.clang-tidy b/.clang-tidy
index 5a2bf70..fcf5922 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -71,7 +71,6 @@ Checks: "-*,clang*,\
 
 HeaderFilterRegex: "be/src/\
 (benchmarks\
-|bufferpool\
 |catalog\
 |codegen\
 |common\

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 435ab19..6a076f8 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -181,7 +181,6 @@ if (DOXYGEN_FOUND)
   # Possible to not input the subdirs one by one?
   set(CMAKE_DOXYGEN_INPUT
     ${CMAKE_SOURCE_DIR}/be/src
-    ${CMAKE_SOURCE_DIR}/be/src/bufferpool/
     ${CMAKE_SOURCE_DIR}/be/src/catalog/
     ${CMAKE_SOURCE_DIR}/be/src/common/
     ${CMAKE_SOURCE_DIR}/be/src/exec/
@@ -390,7 +389,7 @@ add_custom_target(be-test)
 # Utility CMake function to make specifying tests and benchmarks less verbose
 FUNCTION(ADD_BE_TEST TEST_NAME)
   # This gets the directory where the test is from (e.g. 'exprs' or 'runtime')
-  get_filename_component(DIR_NAME ${CMAKE_CURRENT_SOURCE_DIR} NAME)
+  file(RELATIVE_PATH DIR_NAME "${CMAKE_SOURCE_DIR}/be/src/" 
${CMAKE_CURRENT_SOURCE_DIR})
   ADD_EXECUTABLE(${TEST_NAME} ${TEST_NAME}.cc)
   TARGET_LINK_LIBRARIES(${TEST_NAME} ${IMPALA_TEST_LINK_LIBS})
   set(CMAKE_EXE_LINKER_FLAGS "--start-group")
@@ -434,7 +433,6 @@ endfunction(COMPILE_TO_IR)
 add_subdirectory(src/gutil)
 
 # compile these subdirs using their own CMakeLists.txt
-add_subdirectory(src/bufferpool)
 add_subdirectory(src/catalog)
 add_subdirectory(src/codegen)
 add_subdirectory(src/common)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/CMakeLists.txt b/be/src/bufferpool/CMakeLists.txt
deleted file mode 100644
index 2f056e0..0000000
--- a/be/src/bufferpool/CMakeLists.txt
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# where to put generated libraries
-set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
-
-# where to put generated binaries
-set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
-
-add_library(BufferPool
-  buffer-allocator.cc
-  buffer-pool.cc
-  reservation-tracker.cc
-)
-add_dependencies(BufferPool thrift-deps)
-
-ADD_BE_TEST(buffer-pool-test)
-ADD_BE_TEST(reservation-tracker-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-allocator.cc 
b/be/src/bufferpool/buffer-allocator.cc
deleted file mode 100644
index 27bd788..0000000
--- a/be/src/bufferpool/buffer-allocator.cc
+++ /dev/null
@@ -1,39 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "bufferpool/buffer-allocator.h"
-
-#include "util/bit-util.h"
-
-namespace impala {
-
-BufferAllocator::BufferAllocator(int64_t min_buffer_len)
-  : min_buffer_len_(min_buffer_len) {}
-
-Status BufferAllocator::Allocate(int64_t len, uint8_t** buffer) {
-  DCHECK_GE(len, min_buffer_len_);
-  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
-
-  *buffer = reinterpret_cast<uint8_t*>(malloc(len));
-  if (*buffer == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, 
len);
-  return Status::OK();
-}
-
-void BufferAllocator::Free(uint8_t* buffer, int64_t len) {
-  free(buffer);
-}
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-allocator.h 
b/be/src/bufferpool/buffer-allocator.h
deleted file mode 100644
index c3e0c70..0000000
--- a/be/src/bufferpool/buffer-allocator.h
+++ /dev/null
@@ -1,48 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_BUFFER_ALLOCATOR_H
-#define IMPALA_BUFFER_ALLOCATOR_H
-
-#include "common/status.h"
-
-namespace impala {
-
-/// The underlying memory allocator for the buffer pool. All buffers are 
allocated through
-/// the BufferPool's BufferAllocator. The allocator only handles allocating 
buffers that
-/// are power-of-two multiples of the minimum buffer length.
-///
-/// TODO:
-/// * Allocate memory with mmap() instead of malloc().
-/// * Implement free lists in the allocator or external to the allocator.
-class BufferAllocator {
- public:
-  BufferAllocator(int64_t min_buffer_len);
-
-  /// Allocate memory for a buffer of 'len' bytes. 'len' must be a 
power-of-two multiple
-  /// of the minimum buffer length.
-  Status Allocate(int64_t len, uint8_t** buffer);
-
-  /// Free the memory for a previously-allocated buffer.
-  void Free(uint8_t* buffer, int64_t len);
-
- private:
-  const int64_t min_buffer_len_;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-pool-counters.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool-counters.h 
b/be/src/bufferpool/buffer-pool-counters.h
deleted file mode 100644
index 6f3801e..0000000
--- a/be/src/bufferpool/buffer-pool-counters.h
+++ /dev/null
@@ -1,47 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_BUFFER_POOL_COUNTERS_H
-#define IMPALA_BUFFER_POOL_COUNTERS_H
-
-#include "util/runtime-profile.h"
-
-namespace impala {
-
-/// A set of counters for each buffer pool client.
-struct BufferPoolClientCounters {
- public:
-  /// Amount of time spent trying to get a buffer.
-  RuntimeProfile::Counter* get_buffer_time;
-
-  /// Amount of time spent waiting for reads from disk to complete.
-  RuntimeProfile::Counter* read_wait_time;
-
-  /// Amount of time spent waiting for writes to disk to complete.
-  RuntimeProfile::Counter* write_wait_time;
-
-  /// The peak total size of unpinned buffers.
-  RuntimeProfile::HighWaterMarkCounter* peak_unpinned_bytes;
-
-  /// The total bytes of data unpinned. Every time a page's pin count goes 
from 1 to 0,
-  /// this counter is incremented by the page size.
-  RuntimeProfile::Counter* total_unpinned_bytes;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool-test.cc 
b/be/src/bufferpool/buffer-pool-test.cc
deleted file mode 100644
index 793bcb9..0000000
--- a/be/src/bufferpool/buffer-pool-test.cc
+++ /dev/null
@@ -1,554 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <boost/bind.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/unordered_map.hpp>
-#include <cstdlib>
-#include <string>
-#include <vector>
-
-#include "bufferpool/buffer-pool.h"
-#include "bufferpool/reservation-tracker.h"
-#include "common/init.h"
-#include "common/object-pool.h"
-#include "testutil/death-test-util.h"
-#include "testutil/gtest-util.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-class BufferPoolTest : public ::testing::Test {
- public:
-  virtual void SetUp() {}
-
-  virtual void TearDown() {
-    for (auto entry : query_reservations_) {
-      ReservationTracker* tracker = entry.second;
-      tracker->Close();
-    }
-
-    global_reservations_.Close();
-    obj_pool_.Clear();
-  }
-
-  /// The minimum buffer size used in most tests.
-  const static int64_t TEST_BUFFER_LEN = 1024;
-
-  /// Test helper to simulate registering then deregistering a number of 
queries with
-  /// the given initial reservation and reservation limit.
-  void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int 
num_queries,
-      int64_t initial_query_reservation, int64_t query_reservation_limit);
-
-  /// Create and destroy a page multiple times.
-  void CreatePageLoop(BufferPool* pool, ReservationTracker* parent_tracker, 
int num_ops);
-
- protected:
-  static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 
32 | lo; }
-
-  /// Helper function to create one reservation tracker per query.
-  ReservationTracker* GetQueryReservationTracker(int64_t query_id) {
-    lock_guard<SpinLock> l(query_reservations_lock_);
-    ReservationTracker* tracker = query_reservations_[query_id];
-    if (tracker != NULL) return tracker;
-    tracker = obj_pool_.Add(new ReservationTracker());
-    query_reservations_[query_id] = tracker;
-    return tracker;
-  }
-
-  RuntimeProfile* NewProfile() {
-    return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
-  }
-
-  ObjectPool obj_pool_;
-
-  ReservationTracker global_reservations_;
-
-  // Map from query_id to the reservation tracker for that query. Reads and 
modifications
-  // of the map are protected by query_reservations_lock_.
-  unordered_map<int64_t, ReservationTracker*> query_reservations_;
-  SpinLock query_reservations_lock_;
-};
-
-const int64_t BufferPoolTest::TEST_BUFFER_LEN;
-
-void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int 
query_id_hi,
-    int num_queries, int64_t initial_query_reservation, int64_t 
query_reservation_limit) {
-  Status status;
-
-  int clients_per_query = 32;
-  BufferPool::Client* clients[num_queries];
-  ReservationTracker* client_reservations[num_queries];
-
-  for (int i = 0; i < num_queries; ++i) {
-    int64_t query_id = QueryId(query_id_hi, i);
-
-    // Initialize a tracker for a new query.
-    ReservationTracker* query_reservation = 
GetQueryReservationTracker(query_id);
-    query_reservation->InitChildTracker(
-        NULL, &global_reservations_, NULL, query_reservation_limit);
-
-    // Test that closing then reopening child tracker works.
-    query_reservation->Close();
-    query_reservation->InitChildTracker(
-        NULL, &global_reservations_, NULL, query_reservation_limit);
-    
EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation));
-
-    clients[i] = new BufferPool::Client[clients_per_query];
-    client_reservations[i] = new ReservationTracker[clients_per_query];
-
-    for (int j = 0; j < clients_per_query; ++j) {
-      int64_t initial_client_reservation =
-          initial_query_reservation / clients_per_query + j
-          < initial_query_reservation % clients_per_query;
-      // Reservation limit can be anything greater or equal to the initial 
reservation.
-      int64_t client_reservation_limit = initial_client_reservation + rand() % 
100000;
-      client_reservations[i][j].InitChildTracker(
-          NULL, query_reservation, NULL, client_reservation_limit);
-      EXPECT_TRUE(
-          
client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation));
-      string name = Substitute("Client $0 for query $1", j, query_id);
-      EXPECT_OK(pool->RegisterClient(
-          name, &client_reservations[i][j], NewProfile(), &clients[i][j]));
-    }
-
-    for (int j = 0; j < clients_per_query; ++j) {
-      ASSERT_TRUE(clients[i][j].is_registered());
-    }
-  }
-
-  // Deregister clients then query.
-  for (int i = 0; i < num_queries; ++i) {
-    for (int j = 0; j < clients_per_query; ++j) {
-      pool->DeregisterClient(&clients[i][j]);
-      ASSERT_FALSE(clients[i][j].is_registered());
-      client_reservations[i][j].Close();
-    }
-
-    delete[] clients[i];
-    delete[] client_reservations[i];
-
-    GetQueryReservationTracker(QueryId(query_id_hi, i))->Close();
-  }
-}
-
-/// Test that queries and clients can be registered and deregistered with the 
reservation
-/// trackers and the buffer pool.
-TEST_F(BufferPoolTest, BasicRegistration) {
-  int num_concurrent_queries = 1024;
-  int64_t sum_initial_reservations = 4;
-  int64_t reservation_limit = 1024;
-  // Need enough buffers for all initial reservations.
-  int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
-  global_reservations_.InitRootTracker(NewProfile(), total_mem);
-
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
-
-  RegisterQueriesAndClients(
-      &pool, 0, num_concurrent_queries, sum_initial_reservations, 
reservation_limit);
-
-  DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
-  DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
-  DCHECK_EQ(global_reservations_.GetReservation(), 0);
-  global_reservations_.Close();
-}
-
-/// Test that queries and clients can be registered and deregistered by 
concurrent
-/// threads.
-TEST_F(BufferPoolTest, ConcurrentRegistration) {
-  int queries_per_thread = 64;
-  int num_threads = 64;
-  int num_concurrent_queries = queries_per_thread * num_threads;
-  int64_t sum_initial_reservations = 4;
-  int64_t reservation_limit = 1024;
-  // Need enough buffers for all initial reservations.
-  int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
-  global_reservations_.InitRootTracker(NewProfile(), total_mem);
-
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
-
-  // Launch threads, each with a different set of query IDs.
-  thread_group workers;
-  for (int i = 0; i < num_threads; ++i) {
-    workers.add_thread(new 
thread(bind(&BufferPoolTest::RegisterQueriesAndClients, this,
-        &pool, i, queries_per_thread, sum_initial_reservations, 
reservation_limit)));
-  }
-  workers.join_all();
-
-  // All the reservations should be released at this point.
-  DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
-  DCHECK_EQ(global_reservations_.GetReservation(), 0);
-  global_reservations_.Close();
-}
-
-/// Test basic page handle creation.
-TEST_F(BufferPoolTest, PageCreation) {
-  // Allocate many pages, each a power-of-two multiple of the minimum page 
length.
-  int num_pages = 16;
-  int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1);
-  int64_t total_mem = 2 * 2 * max_page_len;
-  global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, 
total_mem);
-  ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem));
-  BufferPool::Client client;
-  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), 
&client));
-
-  vector<BufferPool::PageHandle> handles(num_pages);
-
-  // Create pages of various valid sizes.
-  for (int i = 0; i < num_pages; ++i) {
-    int size_multiple = 1 << i;
-    int64_t page_len = TEST_BUFFER_LEN * size_multiple;
-    int64_t used_before = client_tracker->GetUsedReservation();
-    ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
-    ASSERT_TRUE(handles[i].is_open());
-    ASSERT_TRUE(handles[i].is_pinned());
-    ASSERT_TRUE(handles[i].buffer_handle() != NULL);
-    ASSERT_TRUE(handles[i].data() != NULL);
-    ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
-    ASSERT_EQ(handles[i].len(), page_len);
-    ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
-    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + page_len);
-  }
-
-  // Close the handles and check memory consumption.
-  for (int i = 0; i < num_pages; ++i) {
-    int64_t used_before = client_tracker->GetUsedReservation();
-    int page_len = handles[i].len();
-    pool.DestroyPage(&client, &handles[i]);
-    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - page_len);
-  }
-
-  pool.DeregisterClient(&client);
-  client_tracker->Close();
-
-  // All the reservations should be released at this point.
-  DCHECK_EQ(global_reservations_.GetReservation(), 0);
-  global_reservations_.Close();
-}
-
-TEST_F(BufferPoolTest, BufferAllocation) {
-  // Allocate many buffers, each a power-of-two multiple of the minimum buffer 
length.
-  int num_buffers = 16;
-  int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
-  int64_t total_mem = 2 * 2 * max_buffer_len;
-  global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, 
total_mem);
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem));
-  BufferPool::Client client;
-  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), 
&client));
-
-  vector<BufferPool::BufferHandle> handles(num_buffers);
-
-  // Create buffers of various valid sizes.
-  for (int i = 0; i < num_buffers; ++i) {
-    int size_multiple = 1 << i;
-    int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
-    int64_t used_before = client_tracker->GetUsedReservation();
-    ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
-    ASSERT_TRUE(handles[i].is_open());
-    ASSERT_TRUE(handles[i].data() != NULL);
-    ASSERT_EQ(handles[i].len(), buffer_len);
-    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len);
-  }
-
-  // Close the handles and check memory consumption.
-  for (int i = 0; i < num_buffers; ++i) {
-    int64_t used_before = client_tracker->GetUsedReservation();
-    int buffer_len = handles[i].len();
-    pool.FreeBuffer(&client, &handles[i]);
-    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len);
-  }
-
-  pool.DeregisterClient(&client);
-  client_tracker->Close();
-
-  // All the reservations should be released at this point.
-  DCHECK_EQ(global_reservations_.GetReservation(), 0);
-  global_reservations_.Close();
-}
-
-/// Test transfer of buffer handles between clients.
-TEST_F(BufferPoolTest, BufferTransfer) {
-  // Each client needs to have enough reservation for a buffer.
-  const int num_clients = 5;
-  int64_t total_mem = num_clients * TEST_BUFFER_LEN;
-  global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  ReservationTracker client_trackers[num_clients];
-  BufferPool::Client clients[num_clients];
-  BufferPool::BufferHandle handles[num_clients];
-  for (int i = 0; i < num_clients; ++i) {
-    client_trackers[i].InitChildTracker(
-        NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
-    ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
-    ASSERT_OK(pool.RegisterClient(
-        "test client", &client_trackers[i], NewProfile(), &clients[i]));
-  }
-
-  // Transfer the page around between the clients repeatedly in a circle.
-  ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0]));
-  uint8_t* data = handles[0].data();
-  for (int iter = 0; iter < 10; ++iter) {
-    for (int client = 0; client < num_clients; ++client) {
-      int next_client = (client + 1) % num_clients;
-      ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client],
-          &clients[next_client], &handles[next_client]));
-      // Check that the transfer left things in a consistent state.
-      ASSERT_FALSE(handles[client].is_open());
-      ASSERT_EQ(0, client_trackers[client].GetUsedReservation());
-      ASSERT_TRUE(handles[next_client].is_open());
-      ASSERT_EQ(TEST_BUFFER_LEN, 
client_trackers[next_client].GetUsedReservation());
-      // The same underlying buffer should be used.
-      ASSERT_EQ(data, handles[next_client].data());
-    }
-  }
-
-  pool.FreeBuffer(&clients[0], &handles[0]);
-  for (int i = 0; i < num_clients; ++i) {
-    pool.DeregisterClient(&clients[i]);
-    client_trackers[i].Close();
-  }
-  DCHECK_EQ(global_reservations_.GetReservation(), 0);
-  global_reservations_.Close();
-}
-
-/// Test basic pinning and unpinning.
-TEST_F(BufferPoolTest, Pin) {
-  int64_t total_mem = TEST_BUFFER_LEN * 1024;
-  // Set up client with enough reservation to pin twice.
-  int64_t child_reservation = TEST_BUFFER_LEN * 2;
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  global_reservations_.InitRootTracker(NULL, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(
-      NewProfile(), &global_reservations_, NULL, child_reservation);
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
-  BufferPool::Client client;
-  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), 
&client));
-
-  BufferPool::PageHandle handle1, handle2;
-
-  // Can pin two minimum sized pages.
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
-  ASSERT_TRUE(handle1.is_open());
-  ASSERT_TRUE(handle1.is_pinned());
-  ASSERT_TRUE(handle1.data() != NULL);
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
-  ASSERT_TRUE(handle2.is_open());
-  ASSERT_TRUE(handle2.is_pinned());
-  ASSERT_TRUE(handle2.data() != NULL);
-
-  pool.Unpin(&client, &handle2);
-  ASSERT_FALSE(handle2.is_pinned());
-
-  // Can pin minimum-sized page twice.
-  ASSERT_OK(pool.Pin(&client, &handle1));
-  ASSERT_TRUE(handle1.is_pinned());
-  // Have to unpin twice.
-  pool.Unpin(&client, &handle1);
-  ASSERT_TRUE(handle1.is_pinned());
-  pool.Unpin(&client, &handle1);
-  ASSERT_FALSE(handle1.is_pinned());
-
-  // Can pin double-sized page only once.
-  BufferPool::PageHandle double_handle;
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle));
-  ASSERT_TRUE(double_handle.is_open());
-  ASSERT_TRUE(double_handle.is_pinned());
-  ASSERT_TRUE(double_handle.data() != NULL);
-
-  // Destroy the pages - test destroying both pinned and unpinned.
-  pool.DestroyPage(&client, &handle1);
-  pool.DestroyPage(&client, &handle2);
-  pool.DestroyPage(&client, &double_handle);
-
-  pool.DeregisterClient(&client);
-  client_tracker->Close();
-}
-
-/// Creating a page or pinning without sufficient reservation should DCHECK.
-TEST_F(BufferPoolTest, PinWithoutReservation) {
-  int64_t total_mem = TEST_BUFFER_LEN * 1024;
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  global_reservations_.InitRootTracker(NULL, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(
-      NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
-  BufferPool::Client client;
-  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), 
&client));
-
-  BufferPool::PageHandle handle;
-  IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, 
&handle), "");
-
-  // Should succeed after increasing reservation.
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN));
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
-
-  // But we can't pin again.
-  IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), "");
-
-  pool.DestroyPage(&client, &handle);
-  pool.DeregisterClient(&client);
-  client_tracker->Close();
-}
-
-TEST_F(BufferPoolTest, ExtractBuffer) {
-  int64_t total_mem = TEST_BUFFER_LEN * 1024;
-  // Set up client with enough reservation for two buffers/pins.
-  int64_t child_reservation = TEST_BUFFER_LEN * 2;
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  global_reservations_.InitRootTracker(NULL, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(
-      NewProfile(), &global_reservations_, NULL, child_reservation);
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
-  BufferPool::Client client;
-  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), 
&client));
-
-  BufferPool::PageHandle page;
-  BufferPool::BufferHandle buffer;
-
-  // Test basic buffer extraction.
-  for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) {
-    ASSERT_OK(pool.CreatePage(&client, len, &page));
-    uint8_t* page_data = page.data();
-    pool.ExtractBuffer(&client, &page, &buffer);
-    ASSERT_FALSE(page.is_open());
-    ASSERT_TRUE(buffer.is_open());
-    ASSERT_EQ(len, buffer.len());
-    ASSERT_EQ(page_data, buffer.data());
-    ASSERT_EQ(len, client_tracker->GetUsedReservation());
-    pool.FreeBuffer(&client, &buffer);
-    ASSERT_EQ(0, client_tracker->GetUsedReservation());
-  }
-
-  // Test that ExtractBuffer() accounts correctly for pin count > 1.
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
-  uint8_t* page_data = page.data();
-  ASSERT_OK(pool.Pin(&client, &page));
-  ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation());
-  pool.ExtractBuffer(&client, &page, &buffer);
-  ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation());
-  ASSERT_FALSE(page.is_open());
-  ASSERT_TRUE(buffer.is_open());
-  ASSERT_EQ(TEST_BUFFER_LEN, buffer.len());
-  ASSERT_EQ(page_data, buffer.data());
-  pool.FreeBuffer(&client, &buffer);
-  ASSERT_EQ(0, client_tracker->GetUsedReservation());
-
-  // Test that ExtractBuffer() DCHECKs for unpinned pages.
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
-  pool.Unpin(&client, &page);
-  IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), "");
-  pool.DestroyPage(&client, &page);
-
-  pool.DeregisterClient(&client);
-  client_tracker->Close();
-}
-
-// Test concurrent creation and destruction of pages.
-TEST_F(BufferPoolTest, ConcurrentPageCreation) {
-  int ops_per_thread = 1024;
-  int num_threads = 64;
-  // Need enough buffers for all initial reservations.
-  int total_mem = num_threads * TEST_BUFFER_LEN;
-  global_reservations_.InitRootTracker(NULL, total_mem);
-
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
-
-  // Launch threads, each with a different set of query IDs.
-  thread_group workers;
-  for (int i = 0; i < num_threads; ++i) {
-    workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, 
&pool,
-        &global_reservations_, ops_per_thread)));
-  }
-
-  // Build debug string to test concurrent iteration over pages_ list.
-  for (int i = 0; i < 64; ++i) {
-    LOG(INFO) << pool.DebugString();
-  }
-  workers.join_all();
-
-  // All the reservations should be released at this point.
-  DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
-  global_reservations_.Close();
-}
-
-void BufferPoolTest::CreatePageLoop(
-    BufferPool* pool, ReservationTracker* parent_tracker, int num_ops) {
-  ReservationTracker client_tracker;
-  client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, 
TEST_BUFFER_LEN);
-  BufferPool::Client client;
-  ASSERT_OK(pool->RegisterClient("test client", &client_tracker, NewProfile(), 
&client));
-  for (int i = 0; i < num_ops; ++i) {
-    BufferPool::PageHandle handle;
-    ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN));
-    ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle));
-    pool->Unpin(&client, &handle);
-    ASSERT_OK(pool->Pin(&client, &handle));
-    pool->DestroyPage(&client, &handle);
-    client_tracker.DecreaseReservation(TEST_BUFFER_LEN);
-  }
-  pool->DeregisterClient(&client);
-  client_tracker.Close();
-}
-
-/// Test error path where pool is unable to fulfill a reservation because it 
cannot evict
-/// unpinned pages.
-TEST_F(BufferPoolTest, CapacityExhausted) {
-  global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
-  // TODO: once we enable spilling, set up buffer pool so that spilling is 
disabled.
-  // Set up pool with one more buffer than reservations (so that we hit the 
reservation
-  // limit instead of the buffer limit).
-  BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN * 2);
-
-  BufferPool::PageHandle handle1, handle2, handle3;
-
-  BufferPool::Client client;
-  ASSERT_OK(
-      pool.RegisterClient("test client", &global_reservations_, NewProfile(), 
&client));
-  ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN));
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
-
-  // Do not have enough reservations because we pinned the page.
-  IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, 
&handle2), "");
-
-  // Even with reservations, we can only create one more unpinned page because 
we don't
-  // support eviction yet.
-  pool.Unpin(&client, &handle1);
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
-  pool.Unpin(&client, &handle2);
-  ASSERT_FALSE(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3).ok());
-
-  // After destroying a page, we should have a free buffer that we can use.
-  pool.DestroyPage(&client, &handle1);
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3));
-
-  pool.DestroyPage(&client, &handle2);
-  pool.DestroyPage(&client, &handle3);
-  pool.DeregisterClient(&client);
-}
-}
-
-IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.cc b/be/src/bufferpool/buffer-pool.cc
deleted file mode 100644
index 3035694..0000000
--- a/be/src/bufferpool/buffer-pool.cc
+++ /dev/null
@@ -1,439 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "bufferpool/buffer-pool.h"
-
-#include <boost/bind.hpp>
-#include <limits>
-#include <sstream>
-
-#include "bufferpool/reservation-tracker.h"
-#include "common/names.h"
-#include "gutil/strings/substitute.h"
-#include "util/bit-util.h"
-#include "util/runtime-profile-counters.h"
-#include "util/uid-util.h"
-
-namespace impala {
-
-/// The internal representation of a page, which can be pinned or unpinned. If 
the
-/// page is pinned, a buffer is associated with the page.
-///
-/// Code manipulating the page is responsible for acquiring 'lock' when 
reading or
-/// modifying the page.
-struct BufferPool::Page : public BufferPool::PageList::Node {
-  Page(int64_t len) : len(len), pin_count(0), dirty(false) {}
-
-  /// Increment the pin count. Caller must hold 'lock'.
-  void IncrementPinCount(PageHandle* handle) {
-    lock.DCheckLocked();
-    ++pin_count;
-    // Pinned page buffers may be modified by anyone with a pointer to the 
buffer, so we
-    // have to assume they are dirty.
-    dirty = true;
-  }
-
-  /// Decrement the pin count. Caller must hold 'lock'.
-  void DecrementPinCount(PageHandle* handle) {
-    lock.DCheckLocked();
-    DCHECK(pin_count > 0);
-    --pin_count;
-  }
-
-  string DebugString() {
-    return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3 
dirty: $4", this,
-        len, pin_count, buffer.DebugString(), dirty);
-  }
-
-  // Helper for BufferPool::DebugString().
-  static bool DebugStringCallback(stringstream* ss, BufferPool::Page* page) {
-    lock_guard<SpinLock> pl(page->lock);
-    (*ss) << page->DebugString() << "\n";
-    return true;
-  }
-
-  /// The length of the page in bytes.
-  const int64_t len;
-
-  /// Lock to protect the below members of Page. The lock must be held when 
modifying any
-  /// of the below members and when reading any of the below members of an 
unpinned page.
-  SpinLock lock;
-
-  /// The pin count of the page.
-  int pin_count;
-
-  /// Buffer with the page's contents, Always open if pinned. Closed if page 
is unpinned
-  /// and was evicted from memory.
-  BufferHandle buffer;
-
-  /// True if the buffer's contents need to be saved before evicting it from 
memory.
-  bool dirty;
-};
-
-BufferPool::BufferHandle::BufferHandle() {
-  Reset();
-}
-
-BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) {
-  *this = std::move(src);
-}
-
-BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(BufferHandle&& 
src) {
-  DCHECK(!is_open());
-  // Copy over all members then close src.
-  client_ = src.client_;
-  data_ = src.data_;
-  len_ = src.len_;
-  src.Reset();
-  return *this;
-}
-
-void BufferPool::BufferHandle::Open(const Client* client, uint8_t* data, 
int64_t len) {
-  client_ = client;
-  data_ = data;
-  len_ = len;
-}
-
-void BufferPool::BufferHandle::Reset() {
-  client_ = NULL;
-  data_ = NULL;
-  len_ = -1;
-}
-
-BufferPool::PageHandle::PageHandle() {
-  Reset();
-}
-
-BufferPool::PageHandle::PageHandle(PageHandle&& src) {
-  *this = std::move(src);
-}
-
-BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) {
-  DCHECK(!is_open());
-  // Copy over all members then close src.
-  page_ = src.page_;
-  client_ = src.client_;
-  src.Reset();
-  return *this;
-}
-
-void BufferPool::PageHandle::Open(Page* page, Client* client) {
-  DCHECK(!is_open());
-  page->lock.DCheckLocked();
-  page_ = page;
-  client_ = client;
-}
-
-void BufferPool::PageHandle::Reset() {
-  page_ = NULL;
-  client_ = NULL;
-}
-
-int BufferPool::PageHandle::pin_count() const {
-  DCHECK(is_open());
-  // The pin count can only be modified via this PageHandle, which must not be
-  // concurrently accessed by multiple threads, so it is safe to access 
without locking
-  return page_->pin_count;
-}
-
-int64_t BufferPool::PageHandle::len() const {
-  DCHECK(is_open());
-  // The length of the page cannot change, so it is safe to access without 
locking.
-  return page_->len;
-}
-
-const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const {
-  DCHECK(is_pinned());
-  // The 'buffer' field cannot change while the page is pinned, so it is safe 
to access
-  // without locking.
-  return &page_->buffer;
-}
-
-BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
-  : allocator_(new BufferAllocator(min_buffer_len)),
-    min_buffer_len_(min_buffer_len),
-    buffer_bytes_limit_(buffer_bytes_limit),
-    buffer_bytes_remaining_(buffer_bytes_limit) {
-  DCHECK_GT(min_buffer_len, 0);
-  DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
-}
-
-BufferPool::~BufferPool() {
-  DCHECK(pages_.empty());
-}
-
-Status BufferPool::RegisterClient(
-    const string& name, ReservationTracker* reservation, RuntimeProfile* 
profile,
-    Client* client) {
-  DCHECK(!client->is_registered());
-  DCHECK(reservation != NULL);
-  client->InitCounters(profile);
-  client->reservation_ = reservation;
-  client->name_ = name;
-  return Status::OK();
-}
-
-void BufferPool::Client::InitCounters(RuntimeProfile* profile) {
-  counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime");
-  counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadWaitTime");
-  counters_.write_wait_time = ADD_TIMER(profile, "BufferPoolWriteWaitTime");
-  counters_.peak_unpinned_bytes =
-      profile->AddHighWaterMarkCounter("BufferPoolPeakUnpinnedBytes", 
TUnit::BYTES);
-  counters_.total_unpinned_bytes =
-      ADD_COUNTER(profile, "BufferPoolTotalUnpinnedBytes", TUnit::BYTES);
-}
-
-void BufferPool::DeregisterClient(Client* client) {
-  if (!client->is_registered()) return;
-  client->reservation_->Close();
-  client->name_.clear();
-  client->reservation_ = NULL;
-}
-
-Status BufferPool::CreatePage(Client* client, int64_t len, PageHandle* handle) 
{
-  DCHECK(!handle->is_open());
-  DCHECK_GE(len, min_buffer_len_);
-  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
-
-  BufferHandle buffer;
-  // No changes have been made to state yet, so we can cleanly return on error.
-  RETURN_IF_ERROR(AllocateBufferInternal(client, len, &buffer));
-
-  Page* page = new Page(len);
-  {
-    lock_guard<SpinLock> pl(page->lock);
-    page->buffer = std::move(buffer);
-    handle->Open(page, client);
-    page->IncrementPinCount(handle);
-  }
-
-  // Only add to globally-visible list after page is initialized. The page 
lock also
-  // needs to be released before enqueueing to respect the lock ordering.
-  pages_.Enqueue(page);
-
-  client->reservation_->AllocateFrom(len);
-  return Status::OK();
-}
-
-void BufferPool::DestroyPage(Client* client, PageHandle* handle) {
-  if (!handle->is_open()) return; // DestroyPage() should be idempotent.
-
-  Page* page = handle->page_;
-  if (handle->is_pinned()) {
-    // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do 
the work
-    // of cleaning up the page and freeing the buffer.
-    BufferHandle buffer;
-    ExtractBuffer(client, handle, &buffer);
-    FreeBuffer(client, &buffer);
-    return;
-  }
-
-  {
-    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its 
state.
-    // In the unpinned case, no reservation is consumed, so just free the 
buffer.
-    // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
-    if (page->buffer.is_open()) FreeBufferInternal(&page->buffer);
-  }
-  CleanUpPage(handle);
-}
-
-void BufferPool::CleanUpPage(PageHandle* handle) {
-  // Remove the destroyed page from data structures in a way that ensures no 
other
-  // threads have a remaining reference. Threads that access pages via the 
'pages_'
-  // list hold 'pages_.lock_', so Remove() will not return until those threads 
are done
-  // and it is safe to delete page.
-  pages_.Remove(handle->page_);
-  delete handle->page_;
-  handle->Reset();
-}
-
-Status BufferPool::Pin(Client* client, PageHandle* handle) {
-  DCHECK(client->is_registered());
-  DCHECK(handle->is_open());
-  DCHECK_EQ(handle->client_, client);
-
-  Page* page = handle->page_;
-  {
-    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its 
state.
-    if (page->pin_count == 0)  {
-      if (!page->buffer.is_open()) {
-        // No changes have been made to state yet, so we can cleanly return on 
error.
-        RETURN_IF_ERROR(AllocateBufferInternal(client, page->len, 
&page->buffer));
-
-        // TODO: will need to initiate/wait for read if the page is not 
in-memory.
-      }
-      COUNTER_ADD(client->counters_.peak_unpinned_bytes, -handle->len());
-    }
-    page->IncrementPinCount(handle);
-  }
-
-  client->reservation_->AllocateFrom(page->len);
-  return Status::OK();
-}
-
-void BufferPool::Unpin(Client* client, PageHandle* handle) {
-  DCHECK(handle->is_open());
-  lock_guard<SpinLock> pl(handle->page_->lock);
-  UnpinLocked(client, handle);
-}
-
-void BufferPool::UnpinLocked(Client* client, PageHandle* handle) {
-  DCHECK(client->is_registered());
-  DCHECK_EQ(handle->client_, client);
-  // If handle is pinned, we can assume that the page itself is pinned.
-  DCHECK(handle->is_pinned());
-  Page* page = handle->page_;
-  page->lock.DCheckLocked();
-
-  page->DecrementPinCount(handle);
-  client->reservation_->ReleaseTo(page->len);
-
-  COUNTER_ADD(client->counters_.total_unpinned_bytes, handle->len());
-  COUNTER_ADD(client->counters_.peak_unpinned_bytes, handle->len());
-
-  // TODO: can evict now. Only need to preserve contents if 'page->dirty' is 
true.
-}
-
-void BufferPool::ExtractBuffer(
-    Client* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
-  DCHECK(page_handle->is_pinned());
-
-  DCHECK_EQ(page_handle->client_, client);
-
-  Page* page = page_handle->page_;
-  {
-    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its 
state.
-    // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
-
-    // Bring the pin count to 1 so that we're not using surplus reservations.
-    while (page->pin_count > 1) UnpinLocked(client, page_handle);
-    *buffer_handle = std::move(page->buffer);
-  }
-  CleanUpPage(page_handle);
-}
-
-Status BufferPool::AllocateBuffer(Client* client, int64_t len, BufferHandle* 
handle) {
-  client->reservation_->AllocateFrom(len);
-  return AllocateBufferInternal(client, len, handle);
-}
-
-Status BufferPool::AllocateBufferInternal(
-    Client* client, int64_t len, BufferHandle* buffer) {
-  DCHECK(!buffer->is_open());
-  DCHECK_GE(len, min_buffer_len_);
-  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
-  SCOPED_TIMER(client->counters_.get_buffer_time);
-
-  // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a 
new buffer.
-  if (TryDecreaseBufferBytesRemaining(len)) {
-    uint8_t* data;
-    Status status = allocator_->Allocate(len, &data);
-    if (!status.ok()) {
-      buffer_bytes_remaining_.Add(len);
-      return status;
-    }
-    DCHECK(data != NULL);
-    buffer->Open(client, data, len);
-    return Status::OK();
-  }
-
-  // If there is no remaining capacity, we must evict another page.
-  return Status(TErrorCode::NOT_IMPLEMENTED_ERROR,
-      Substitute("Buffer bytes limit $0 of buffer pool is exhausted and page 
eviction is "
-                 "not implemented yet!", buffer_bytes_limit_));
-}
-
-void BufferPool::FreeBuffer(Client* client, BufferHandle* handle) {
-  if (!handle->is_open()) return; // Should be idempotent.
-  DCHECK_EQ(client, handle->client_);
-  client->reservation_->ReleaseTo(handle->len_);
-  FreeBufferInternal(handle);
-}
-
-void BufferPool::FreeBufferInternal(BufferHandle* handle) {
-  DCHECK(handle->is_open());
-  allocator_->Free(handle->data(), handle->len());
-  buffer_bytes_remaining_.Add(handle->len());
-  handle->Reset();
-}
-
-Status BufferPool::TransferBuffer(
-    Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* 
dst) {
-  DCHECK(src->is_open());
-  DCHECK(!dst->is_open());
-  DCHECK_EQ(src_client, src->client_);
-  DCHECK_NE(src, dst);
-  DCHECK_NE(src_client, dst_client);
-
-  dst_client->reservation_->AllocateFrom(src->len());
-  src_client->reservation_->ReleaseTo(src->len());
-  *dst = std::move(*src);
-  dst->client_ = dst_client;
-  return Status::OK();
-}
-
-bool BufferPool::TryDecreaseBufferBytesRemaining(int64_t len) {
-  // TODO: we may want to change this policy so that we don't always use up to 
the limit
-  // for buffers, since this may starve other operators using non-buffer-pool 
memory.
-  while (true) {
-    int64_t old_value = buffer_bytes_remaining_.Load();
-    if (old_value < len) return false;
-    int64_t new_value = old_value - len;
-    if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
-      return true;
-    }
-  }
-}
-
-string BufferPool::Client::DebugString() const {
-  if (is_registered()) {
-    return Substitute("<BufferPool::Client> $0 name: $1 reservation: {$2}", 
this, name_,
-        reservation_->DebugString());
-  } else {
-    return Substitute("<BufferPool::Client> $0 UNREGISTERED", this);
-  }
-}
-
-string BufferPool::PageHandle::DebugString() const {
-  if (is_open()) {
-    lock_guard<SpinLock> pl(page_->lock);
-    return Substitute(
-        "<BufferPool::PageHandle> $0 client: {$1} page: {$2}",
-        this, client_->DebugString(), page_->DebugString());
-  } else {
-    return Substitute("<BufferPool::PageHandle> $0 CLOSED", this);
-  }
-}
-
-string BufferPool::BufferHandle::DebugString() const {
-  if (is_open()) {
-    return Substitute("<BufferPool::BufferHandle> $0 client: {$1} data: $2 
len: $3", this,
-        client_->DebugString(), data_, len_);
-  } else {
-    return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this);
-  }
-}
-
-string BufferPool::DebugString() {
-  stringstream ss;
-  ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
-     << " buffer_bytes_limit: " << buffer_bytes_limit_
-     << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n";
-  pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
-  return ss.str();
-}
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.h b/be/src/bufferpool/buffer-pool.h
deleted file mode 100644
index 6a9641d..0000000
--- a/be/src/bufferpool/buffer-pool.h
+++ /dev/null
@@ -1,426 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_BUFFER_POOL_H
-#define IMPALA_BUFFER_POOL_H
-
-#include <stdint.h>
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread/locks.hpp>
-#include <string>
-
-#include "bufferpool/buffer-allocator.h"
-#include "bufferpool/buffer-pool-counters.h"
-#include "common/atomic.h"
-#include "common/status.h"
-#include "gutil/macros.h"
-#include "util/internal-queue.h"
-#include "util/spinlock.h"
-
-namespace impala {
-
-class BufferAllocator;
-class ReservationTracker;
-
-/// A buffer pool that manages memory buffers for all queries in an Impala 
daemon.
-/// The buffer pool enforces buffer reservations, limits, and implements 
policies
-/// for moving spilled memory from in-memory buffers to disk. It also enables 
reuse of
-/// buffers between queries, to avoid frequent allocations.
-///
-/// The buffer pool can be used for allocating any large buffers (above a 
configurable
-/// minimum length), whether or not the buffers will be spilled. Smaller 
allocations
-/// are not serviced directly by the buffer pool: clients of the buffer pool 
must
-/// subdivide buffers if they wish to use smaller allocations.
-///
-/// All buffer pool operations are in the context of a registered buffer pool 
client.
-/// A buffer pool client should be created for every allocator of buffers at 
the level
-/// of granularity required for reporting and enforcement of reservations, 
e.g. an exec
-/// node. The client tracks buffer reservations via its ReservationTracker and 
also
-/// includes info that is helpful for debugging (e.g. the operator that is 
associated
-/// with the buffer). The client is not threadsafe, i.e. concurrent buffer pool
-/// operations should not be invoked for the same client.
-///
-/// TODO:
-/// * Implement spill-to-disk.
-/// * Decide on, document, and enforce upper limits on page size.
-///
-/// Pages, Buffers and Pinning
-/// ==========================
-/// * A page is a logical block of memory that can reside in memory or on disk.
-/// * A buffer is a physical block of memory that can hold a page in memory.
-/// * A page handle is used by buffer pool clients to identify and access a 
page and
-///   the corresponding buffer. Clients do not interact with pages directly.
-/// * A buffer handle is used by buffer pool clients to identify and access a 
buffer.
-/// * A page is pinned if it has pin count > 0. A pinned page stays mapped to 
the same
-///   buffer.
-/// * An unpinned page can be written out to disk by the buffer pool so that 
the buffer
-///   can be used for another purpose.
-///
-/// Buffer/Page Sizes
-/// =================
-/// The buffer pool has a minimum buffer size, which must be a power-of-two. 
Page and
-/// buffer sizes must be an exact power-of-two multiple of the minimum buffer 
size.
-///
-/// Reservations
-/// ============
-/// Before allocating buffers or pinning pages, a client must reserve memory 
through its
-/// ReservationTracker. Reservation of n bytes give a client the right to 
allocate
-/// buffers or pin pages summing up to n bytes. Reservations are both 
necessary and
-/// sufficient for a client to allocate buffers or pin pages: the operations 
succeed
-/// unless a "system error" such as a disk write error is encountered that 
prevents
-/// unpinned pages from being  to disk.
-///
-/// More memory may be reserved than is used, e.g. if a client is not using 
its full
-/// reservation. In such cases, the buffer pool can use the free buffers in 
any way,
-/// e.g. for keeping unpinned pages in memory, so long as it is able to 
fulfill the
-/// reservations when needed, e.g. by flushing unpinned pages to disk.
-///
-/// Page/Buffer Handles
-/// ===================
-/// The buffer pool exposes PageHandles and BufferHandles, which are owned by 
clients of
-/// the buffer pool, and act as a proxy for the internal data structure 
representing the
-/// page or buffer in the buffer pool. Handles are "open" if they are 
associated with a
-/// page or buffer. An open PageHandle is obtained by creating a page. 
PageHandles are
-/// closed by calling BufferPool::DestroyPage(). An open BufferHandle is 
obtained by
-/// allocating a buffer or extracting a BufferHandle from a PageHandle. A 
page's buffer
-/// can also be accessed through the PageHandle. The handle destructors check 
for
-/// resource leaks, e.g. an open handle that would result in a buffer leak.
-///
-/// Pin Counting of Page Handles:
-/// ----------------------------------
-/// Page handles are scoped to a client. The invariants are as follows:
-/// * A page can only be accessed through an open handle.
-/// * A page is destroyed once the handle is destroyed via DestroyPage().
-/// * A page's buffer can only be accessed through a pinned handle.
-/// * Pin() can be called on an open handle, incrementing the handle's pin 
count.
-/// * Unpin() can be called on a pinned handle, but not an unpinned handle.
-/// * Pin() always increases usage of reservations, and Unpin() always 
decreases usage,
-///   i.e. the handle consumes <pin count> * <page size> bytes of reservation.
-///
-/// Example Usage: Buffers
-/// ==================================
-/// The simplest use case is to allocate a memory buffer.
-/// * The new buffer is created with AllocateBuffer().
-/// * The client reads and writes to the buffer as it sees fit.
-/// * If the client is done with the buffer's contents it can call 
FreeBuffer() to
-///   destroy the handle and free the buffer, or use TransferBuffer() to 
transfer
-///   the buffer to a different client.
-///
-/// Example Usage: Spillable Pages
-/// ==============================
-/// * A spilling operator creates a new page with CreatePage().
-/// * The client reads and writes to the page's buffer as it sees fit.
-/// * If the operator encounters memory pressure, it can decrease reservation 
usage by
-///   calling Unpin() on the page. The page may then be written to disk and 
its buffer
-///   repurposed internally by BufferPool.
-/// * Once the operator needs the page's contents again and has sufficient 
unused
-///   reservation, it can call Pin(), which brings the page's contents back 
into memory,
-///   perhaps in a different buffer. Therefore the operator must fix up any 
pointers into
-///   the previous buffer.
-/// * If the operator is done with the page, it can call FreeBuffer() to 
destroy the
-///   handle and release resources, or call ExtractBuffer() to extract the 
buffer.
-///
-/// Synchronization
-/// ===============
-/// The data structures in the buffer pool itself are thread-safe. 
Client-owned data
-/// structures - Client, PageHandle and BufferHandle - are not protected from 
concurrent
-/// access by the buffer pool: clients must ensure that they do not invoke 
concurrent
-/// operations with the same Client, PageHandle or BufferHandle.
-//
-/// +========================+
-/// | IMPLEMENTATION DETAILS |
-/// +========================+
-///
-/// Lock Ordering
-/// =============
-/// The lock ordering is:
-/// * pages_::lock_ -> Page::lock_
-///
-/// If a reference to a page is acquired via the pages_ list, pages_::lock_ 
must be held
-/// until done with the page to ensure the page isn't concurrently deleted.
-class BufferPool {
- public:
-  class BufferHandle;
-  class Client;
-  class PageHandle;
-
-  /// Constructs a new buffer pool.
-  /// 'min_buffer_len': the minimum buffer length for the pool. Must be a 
power of two.
-  /// 'buffer_bytes_limit': the maximum physical memory in bytes that can be 
used by the
-  ///     buffer pool. If 'buffer_bytes_limit' is not a multiple of 
'min_buffer_len', the
-  ///     remainder will not be usable.
-  BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit);
-  ~BufferPool();
-
-  /// Register a client. Returns an error status and does not register the 
client if the
-  /// arguments are invalid. 'name' is an arbitrary name used to identify the 
client in
-  /// any errors messages or logging. Counters for this client are added to 
the (non-NULL)
-  /// 'profile'. 'client' is the client to register. 'client' should not 
already be
-  /// registered.
-  Status RegisterClient(const std::string& name, ReservationTracker* 
reservation,
-      RuntimeProfile* profile, Client* client);
-
-  /// Deregister 'client' if it is registered. Idempotent.
-  void DeregisterClient(Client* client);
-
-  /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page 
length
-  /// supported by BufferPool (see BufferPool class comment). The client must 
have
-  /// sufficient unused reservation to pin the new page (otherwise it will 
DCHECK).
-  /// CreatePage() only fails when a system error prevents the buffer pool 
from fulfilling
-  /// the reservation.
-  /// On success, the handle is mapped to the new page.
-  Status CreatePage(Client* client, int64_t len, PageHandle* handle);
-
-  /// Increment the pin count of 'handle'. After Pin() the underlying page will
-  /// be mapped to a buffer, which will be accessible through 'handle'. Uses
-  /// reservation from 'client'. The caller is responsible for ensuring it has 
enough
-  /// unused reservation before calling Pin() (otherwise it will DCHECK). 
Pin() only
-  /// fails when a system error prevents the buffer pool from fulfilling the 
reservation.
-  /// 'handle' must be open.
-  Status Pin(Client* client, PageHandle* handle);
-
-  /// Decrement the pin count of 'handle'. Decrease client's reservation 
usage. If the
-  /// handle's pin count becomes zero, it is no longer valid for the 
underlying page's
-  /// buffer to be accessed via 'handle'. If the page's total pin count across 
all
-  /// handles that reference it goes to zero, the page's data may be written 
to disk and
-  /// the buffer reclaimed. 'handle' must be open and have a pin count > 0.
-  /// TODO: once we implement spilling, it will be an error to call Unpin() 
with
-  /// spilling disabled. E.g. if Impala is running without scratch (we want to 
be
-  /// able to test Unpin() before we implement actual spilling).
-  void Unpin(Client* client, PageHandle* handle);
-
-  /// Destroy the page referenced by 'handle' (if 'handle' is open). Any 
buffers or disk
-  /// storage backing the page are freed. Idempotent. If the page is pinned, 
the
-  /// reservation usage is decreased accordingly.
-  void DestroyPage(Client* client, PageHandle* handle);
-
-  /// Extracts buffer from a pinned page. After this returns, the page 
referenced by
-  /// 'page_handle' will be destroyed and 'buffer_handle' will reference the 
buffer from
-  /// 'page_handle'. This may decrease reservation usage of 'client' if the 
page was
-  /// pinned multiple times via 'page_handle'.
-  void ExtractBuffer(
-      Client* client, PageHandle* page_handle, BufferHandle* buffer_handle);
-
-  /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. 
The caller
-  /// is responsible for ensuring it has enough unused reservation before 
calling
-  /// AllocateBuffer() (otherwise it will DCHECK). AllocateBuffer() only fails 
when
-  /// a system error prevents the buffer pool from fulfilling the reservation.
-  Status AllocateBuffer(Client* client, int64_t len, BufferHandle* handle);
-
-  /// If 'handle' is open, close 'handle', free the buffer and and decrease the
-  /// reservation usage from 'client'. Idempotent.
-  void FreeBuffer(Client* client, BufferHandle* handle);
-
-  /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move 
the
-  /// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' 
and
-  /// decreases reservation usage in 'src_client'. 'src' must be open and 
'dst' must be
-  /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be 
different.
-  /// After a successful call, 'src' is closed and 'dst' is open.
-  Status TransferBuffer(Client* src_client, BufferHandle* src, Client* 
dst_client,
-      BufferHandle* dst);
-
-  /// Print a debug string with the state of the buffer pool.
-  std::string DebugString();
-
-  int64_t min_buffer_len() const;
-  int64_t buffer_bytes_limit() const;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(BufferPool);
-  struct Page;
-
-  /// Same as Unpin(), except the lock for the page referenced by 'handle' 
must be held
-  /// by the caller.
-  void UnpinLocked(Client* client, PageHandle* handle);
-
-  /// Perform the cleanup of the page object and handle when the page is 
destroyed.
-  /// Reset 'handle', free the Page object and remove the 'pages_' entry.
-  /// The 'handle->page_' lock should *not* be held by the caller.
-  void CleanUpPage(PageHandle* handle);
-
-  /// Allocate a buffer of length 'len'. Assumes that the client's reservation 
has already
-  /// been consumed for the buffer. Returns an error if the pool is unable to 
fulfill the
-  /// reservation.
-  Status AllocateBufferInternal(Client* client, int64_t len, BufferHandle* 
buffer);
-
-  /// Frees 'buffer', which must be open before calling. Closes 'buffer' and 
updates
-  /// internal state but does not release to any reservation.
-  void FreeBufferInternal(BufferHandle* buffer);
-
-  /// Check if we can allocate another buffer of size 'len' bytes without
-  /// 'buffer_bytes_remaining_' going negative.
-  /// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if 
successful.
-  bool TryDecreaseBufferBytesRemaining(int64_t len);
-
-  /// Allocator for allocating and freeing all buffer memory.
-  boost::scoped_ptr<BufferAllocator> allocator_;
-
-  /// The minimum length of a buffer in bytes. All buffers and pages are a 
power-of-two
-  /// multiple of this length. This is always a power of two.
-  const int64_t min_buffer_len_;
-
-  /// The maximum physical memory in bytes that can be used for buffers.
-  const int64_t buffer_bytes_limit_;
-
-  /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used 
for
-  /// allocating new buffers. Must be updated atomically before a new buffer is
-  /// allocated or after an existing buffer is freed.
-  AtomicInt64 buffer_bytes_remaining_;
-
-  /// List containing all pages. Protected by the list's internal lock.
-  typedef InternalQueue<Page> PageList;
-  PageList pages_;
-};
-
-/// External representation of a client of the BufferPool. Clients are used for
-/// reservation accounting, and will be used in the future for tracking 
per-client
-/// buffer pool counters. This class is the external handle for a client so
-/// each Client instance is owned by the BufferPool's client, rather than the 
BufferPool.
-/// Each Client should only be used by a single thread at a time: concurrently 
calling
-/// Client methods or BufferPool methods with the Client as an argument is not 
supported.
-class BufferPool::Client {
- public:
-  Client() : reservation_(NULL) {}
-  /// Client must be deregistered.
-  ~Client() { DCHECK(!is_registered()); }
-
-  bool is_registered() const { return reservation_ != NULL; }
-  ReservationTracker* reservation() { return reservation_; }
-
-  std::string DebugString() const;
-
- private:
-  friend class BufferPool;
-  DISALLOW_COPY_AND_ASSIGN(Client);
-
-  /// Initialize 'counters_' and add the counters to 'profile'.
-  void InitCounters(RuntimeProfile* profile);
-
-  /// A name identifying the client.
-  std::string name_;
-
-  /// The reservation tracker for the client. NULL means the client isn't 
registered.
-  /// All pages pinned by the client count as usage against 'reservation_'.
-  ReservationTracker* reservation_;
-
-  /// The RuntimeProfile counters for this client. All non-NULL if 
is_registered()
-  /// is true.
-  BufferPoolClientCounters counters_;
-};
-
-/// A handle to a buffer allocated from the buffer pool. Each BufferHandle 
should only
-/// be used by a single thread at a time: concurrently calling BufferHandle 
methods or
-/// BufferPool methods with the BufferHandle as an argument is not supported.
-class BufferPool::BufferHandle {
- public:
-  BufferHandle();
-  ~BufferHandle() { DCHECK(!is_open()); }
-
-  /// Allow move construction of handles, to support std::move().
-  BufferHandle(BufferHandle&& src);
-
-  /// Allow move assignment of handles, to support STL classes like 
std::vector.
-  /// Destination must be uninitialized.
-  BufferHandle& operator=(BufferHandle&& src);
-
-  bool is_open() const { return data_ != NULL; }
-  int64_t len() const {
-    DCHECK(is_open());
-    return len_;
-  }
-  /// Get a pointer to the start of the buffer.
-  uint8_t* data() const {
-    DCHECK(is_open());
-    return data_;
-  }
-
-  std::string DebugString() const;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(BufferHandle);
-  friend class BufferPool;
-
-  /// Internal helper to set the handle to an opened state.
-  void Open(const Client* client, uint8_t* data, int64_t len);
-
-  /// Internal helper to reset the handle to an unopened state.
-  void Reset();
-
-  /// The client the buffer handle belongs to, used to validate that the 
correct client
-  /// is provided in BufferPool method calls.
-  const Client* client_;
-
-  /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed.
-  uint8_t* data_;
-
-  /// Length of the buffer in bytes.
-  int64_t len_;
-};
-
-/// The handle for a page used by clients of the BufferPool. Each PageHandle 
should
-/// only be used by a single thread at a time: concurrently calling PageHandle 
methods
-/// or BufferPool methods with the PageHandle as an argument is not supported.
-class BufferPool::PageHandle {
- public:
-  PageHandle();
-  ~PageHandle() { DCHECK(!is_open()); }
-
-  // Allow move construction of page handles, to support std::move().
-  PageHandle(PageHandle&& src);
-
-  // Allow move assignment of page handles, to support STL classes like 
std::vector.
-  // Destination must be closed.
-  PageHandle& operator=(PageHandle&& src);
-
-  bool is_open() const { return page_ != NULL; }
-  bool is_pinned() const { return pin_count() > 0; }
-  int pin_count() const;
-  int64_t len() const;
-  /// Get a pointer to the start of the page's buffer. Only valid to call if 
the page
-  /// is pinned via this handle.
-  uint8_t* data() const { return buffer_handle()->data(); }
-
-  /// Return a pointer to the page's buffer handle. Only valid to call if the 
page is
-  /// pinned via this handle. Only const accessors of the returned handle can 
be used:
-  /// it is invalid to call FreeBuffer() or TransferBuffer() on it or to 
otherwise modify
-  /// the handle.
-  const BufferHandle* buffer_handle() const;
-
-  std::string DebugString() const;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(PageHandle);
-  friend class BufferPool;
-  friend class Page;
-
-  /// Internal helper to open the handle for the given page.
-  void Open(Page* page, Client* client);
-
-  /// Internal helper to reset the handle to an unopened state.
-  void Reset();
-
-  /// The internal page structure. NULL if the handle is not open.
-  Page* page_;
-
-  /// The client the page handle belongs to, used to validate that the correct 
client
-  /// is being used.
-  const Client* client_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/reservation-tracker-counters.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker-counters.h 
b/be/src/bufferpool/reservation-tracker-counters.h
deleted file mode 100644
index 0952a2f..0000000
--- a/be/src/bufferpool/reservation-tracker-counters.h
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RESERVATION_TRACKER_COUNTERS_H
-#define IMPALA_RESERVATION_TRACKER_COUNTERS_H
-
-#include "util/runtime-profile.h"
-
-namespace impala {
-
-/// A set of counters for each ReservationTracker for reporting purposes.
-///
-/// If the ReservationTracker is linked to a profile these have the same 
lifetime as that
-/// profile, otherwise they have the same lifetime as the ReservationTracker 
itself.
-struct ReservationTrackerCounters {
-  /// The tracker's peak reservation in bytes.
-  RuntimeProfile::HighWaterMarkCounter* peak_reservation;
-
-  /// The tracker's peak usage in bytes.
-  RuntimeProfile::HighWaterMarkCounter* peak_used_reservation;
-
-  /// The hard limit on the tracker's reservations
-  RuntimeProfile::Counter* reservation_limit;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac44d6c3/be/src/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker-test.cc 
b/be/src/bufferpool/reservation-tracker-test.cc
deleted file mode 100644
index 66ce287..0000000
--- a/be/src/bufferpool/reservation-tracker-test.cc
+++ /dev/null
@@ -1,378 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <limits>
-#include <string>
-#include <vector>
-
-#include "bufferpool/reservation-tracker.h"
-#include "common/init.h"
-#include "common/object-pool.h"
-#include "runtime/mem-tracker.h"
-#include "testutil/gtest-util.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-class ReservationTrackerTest : public ::testing::Test {
- public:
-  virtual void SetUp() {}
-
-  virtual void TearDown() {
-    root_.Close();
-    obj_pool_.Clear();
-  }
-
-  /// The minimum allocation size used in most tests.
-  const static int64_t MIN_BUFFER_LEN = 1024;
-
- protected:
-  RuntimeProfile* NewProfile() {
-    return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
-  }
-
-  ObjectPool obj_pool_;
-
-  ReservationTracker root_;
-
-  scoped_ptr<RuntimeProfile> profile_;
-};
-
-const int64_t ReservationTrackerTest::MIN_BUFFER_LEN;
-
-TEST_F(ReservationTrackerTest, BasicSingleTracker) {
-  const int64_t limit = 16;
-  root_.InitRootTracker(NULL, limit);
-  ASSERT_EQ(0, root_.GetReservation());
-  ASSERT_EQ(0, root_.GetUsedReservation());
-
-  // Fail to increase reservation.
-  ASSERT_FALSE(root_.IncreaseReservation(limit + 1));
-  ASSERT_EQ(0, root_.GetReservation());
-  ASSERT_EQ(0, root_.GetUsedReservation());
-  ASSERT_EQ(0, root_.GetUnusedReservation());
-
-  // Successfully increase reservation.
-  ASSERT_TRUE(root_.IncreaseReservation(limit - 1));
-  ASSERT_EQ(limit - 1, root_.GetReservation());
-  ASSERT_EQ(0, root_.GetUsedReservation());
-  ASSERT_EQ(limit - 1, root_.GetUnusedReservation());
-
-  // Adjust usage.
-  root_.AllocateFrom(2);
-  ASSERT_EQ(limit - 1, root_.GetReservation());
-  ASSERT_EQ(2, root_.GetUsedReservation());
-  ASSERT_EQ(limit - 3, root_.GetUnusedReservation());
-  root_.ReleaseTo(1);
-  ASSERT_EQ(1, root_.GetUsedReservation());
-  root_.ReleaseTo(1);
-  ASSERT_EQ(0, root_.GetUsedReservation());
-  ASSERT_EQ(limit - 1, root_.GetReservation());
-  ASSERT_EQ(limit - 1, root_.GetUnusedReservation());
-}
-
-TEST_F(ReservationTrackerTest, BasicTwoLevel) {
-  const int64_t limit = 16;
-  root_.InitRootTracker(NULL, limit);
-
-  const int64_t root_reservation = limit / 2;
-  // Get half of the limit as an initial reservation.
-  ASSERT_TRUE(root_.IncreaseReservation(root_reservation));
-  ASSERT_EQ(root_reservation, root_.GetReservation());
-  ASSERT_EQ(root_reservation, root_.GetUnusedReservation());
-  ASSERT_EQ(0, root_.GetUsedReservation());
-  ASSERT_EQ(0, root_.GetChildReservations());
-
-  ReservationTracker child;
-  child.InitChildTracker(NULL, &root_, NULL, numeric_limits<int64_t>::max());
-
-  const int64_t child_reservation = root_reservation + 1;
-  // Get parent's reservation plus a bit more.
-  ASSERT_TRUE(child.IncreaseReservation(child_reservation));
-
-  ASSERT_EQ(child_reservation, root_.GetReservation());
-  ASSERT_EQ(0, root_.GetUnusedReservation());
-  ASSERT_EQ(0, root_.GetUsedReservation());
-  ASSERT_EQ(child_reservation, root_.GetChildReservations());
-
-  ASSERT_EQ(child_reservation, child.GetReservation());
-  ASSERT_EQ(child_reservation, child.GetUnusedReservation());
-  ASSERT_EQ(0, child.GetUsedReservation());
-  ASSERT_EQ(0, child.GetChildReservations());
-
-  // Check that child allocation is reflected correctly.
-  child.AllocateFrom(1);
-  ASSERT_EQ(child_reservation, child.GetReservation());
-  ASSERT_EQ(1, child.GetUsedReservation());
-  ASSERT_EQ(0, root_.GetUsedReservation());
-
-  // Check that parent reservation increase is reflected correctly.
-  ASSERT_TRUE(root_.IncreaseReservation(1));
-  ASSERT_EQ(child_reservation + 1, root_.GetReservation());
-  ASSERT_EQ(1, root_.GetUnusedReservation());
-  ASSERT_EQ(0, root_.GetUsedReservation());
-  ASSERT_EQ(child_reservation, root_.GetChildReservations());
-  ASSERT_EQ(child_reservation, child.GetReservation());
-
-  // Check that parent allocation is reflected correctly.
-  root_.AllocateFrom(1);
-  ASSERT_EQ(child_reservation + 1, root_.GetReservation());
-  ASSERT_EQ(0, root_.GetUnusedReservation());
-  ASSERT_EQ(1, root_.GetUsedReservation());
-
-  // Release allocations.
-  root_.ReleaseTo(1);
-  ASSERT_EQ(0, root_.GetUsedReservation());
-  child.ReleaseTo(1);
-  ASSERT_EQ(0, child.GetUsedReservation());
-
-  // Child reservation should be returned all the way up the tree.
-  child.DecreaseReservation(1);
-  ASSERT_EQ(child_reservation, root_.GetReservation());
-  ASSERT_EQ(child_reservation - 1, child.GetReservation());
-  ASSERT_EQ(child_reservation - 1, root_.GetChildReservations());
-
-  // Closing the child should release its reservation.
-  child.Close();
-  ASSERT_EQ(1, root_.GetReservation());
-  ASSERT_EQ(0, root_.GetChildReservations());
-}
-
-TEST_F(ReservationTrackerTest, CloseIdempotency) {
-  // Check we can close before opening.
-  root_.Close();
-
-  const int64_t limit = 16;
-  root_.InitRootTracker(NULL, limit);
-
-  // Check we can close twice
-  root_.Close();
-  root_.Close();
-}
-
-// Test that the tracker's reservation limit is enforced.
-TEST_F(ReservationTrackerTest, ReservationLimit) {
-  Status status;
-  // Setup trackers so that there is a spare buffer that the client isn't 
entitled to.
-  int64_t total_mem = MIN_BUFFER_LEN * 3;
-  int64_t client_limit = MIN_BUFFER_LEN * 2;
-  root_.InitRootTracker(NULL, total_mem);
-
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(NULL, &root_, NULL, client_limit);
-
-  // We can only increase reservation up to the client's limit.
-  ASSERT_FALSE(client_tracker->IncreaseReservation(client_limit + 1));
-  ASSERT_TRUE(client_tracker->IncreaseReservation(client_limit));
-  ASSERT_FALSE(client_tracker->IncreaseReservation(1));
-
-  client_tracker->Close();
-}
-
-// Test that parent's reservation limit is enforced.
-TEST_F(ReservationTrackerTest, ParentReservationLimit) {
-  Status status;
-  // Setup reservations so that there is a spare buffer.
-  int64_t total_mem = MIN_BUFFER_LEN * 4;
-  int64_t parent_limit = MIN_BUFFER_LEN * 3;
-  int64_t other_client_reservation = MIN_BUFFER_LEN;
-  root_.InitRootTracker(NULL, total_mem);
-
-  // The child reservation limit is higher than the parent reservation limit, 
so the
-  // parent limit is the effective limit.
-  ReservationTracker* query_tracker = obj_pool_.Add(new ReservationTracker());
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  query_tracker->InitChildTracker(NULL, &root_, NULL, parent_limit);
-  client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem * 10);
-
-  ReservationTracker* other_client_tracker = obj_pool_.Add(new 
ReservationTracker());
-  other_client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem);
-  
ASSERT_TRUE(other_client_tracker->IncreaseReservationToFit(other_client_reservation));
-  ASSERT_EQ(root_.GetUsedReservation(), 0);
-  ASSERT_EQ(root_.GetChildReservations(), other_client_reservation);
-  ASSERT_EQ(query_tracker->GetUsedReservation(), 0);
-  ASSERT_EQ(query_tracker->GetUnusedReservation(), 0);
-
-  // Can only increase reservation up to parent limit, excluding other 
reservations.
-  int64_t effective_limit = parent_limit - other_client_reservation;
-  ASSERT_FALSE(client_tracker->IncreaseReservation(effective_limit + 
MIN_BUFFER_LEN));
-  ASSERT_TRUE(client_tracker->IncreaseReservation(effective_limit));
-  ASSERT_FALSE(client_tracker->IncreaseReservation(MIN_BUFFER_LEN));
-
-  // Check that tracker hierarchy reports correct usage.
-  ASSERT_EQ(root_.GetUsedReservation(), 0);
-  ASSERT_EQ(root_.GetChildReservations(), parent_limit);
-  ASSERT_EQ(query_tracker->GetUsedReservation(), 0);
-  ASSERT_EQ(query_tracker->GetUnusedReservation(), 0);
-
-  client_tracker->Close();
-  other_client_tracker->Close();
-  query_tracker->Close();
-}
-
-/// Test integration of ReservationTracker with MemTracker.
-TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) {
-  // Setup a 2-level hierarchy of trackers. The child ReservationTracker is 
linked to
-  // the child MemTracker. We add various limits at different places to enable 
testing
-  // of different code paths.
-  root_.InitRootTracker(NewProfile(), MIN_BUFFER_LEN * 100);
-  MemTracker root_mem_tracker;
-  MemTracker child_mem_tracker1(-1, "Child 1", &root_mem_tracker);
-  MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, "Child 2", 
&root_mem_tracker);
-  ReservationTracker child_reservations1, child_reservations2;
-  child_reservations1.InitChildTracker(
-      NewProfile(), &root_, &child_mem_tracker1, 500 * MIN_BUFFER_LEN);
-  child_reservations2.InitChildTracker(
-      NewProfile(), &root_, &child_mem_tracker2, 75 * MIN_BUFFER_LEN);
-
-  // Check that a single buffer reservation is accounted correctly.
-  ASSERT_TRUE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN));
-  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
-  ASSERT_EQ(MIN_BUFFER_LEN, root_mem_tracker.consumption());
-  ASSERT_EQ(MIN_BUFFER_LEN, root_.GetChildReservations());
-
-  // Check that a buffer reservation from the other child is accounted 
correctly.
-  ASSERT_TRUE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN));
-  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
-  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
-  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
-
-  // Check that hitting the MemTracker limit leaves things in a consistent 
state.
-  ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 50));
-  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
-  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
-  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
-
-  // Check that hitting the ReservationTracker's local limit leaves things in a
-  // consistent state.
-  ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 75));
-  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
-  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
-  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
-
-  // Check that hitting the ReservationTracker's parent's limit after the
-  // MemTracker consumption is incremented leaves things in a consistent state.
-  ASSERT_FALSE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN * 100));
-  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
-  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
-  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
-  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
-
-  // Check that released memory is decremented from all trackers correctly.
-  child_reservations1.DecreaseReservation(MIN_BUFFER_LEN);
-  child_reservations2.DecreaseReservation(MIN_BUFFER_LEN);
-  ASSERT_EQ(0, child_reservations2.GetReservation());
-  ASSERT_EQ(0, child_mem_tracker2.consumption());
-  ASSERT_EQ(0, root_mem_tracker.consumption());
-  ASSERT_EQ(0, root_.GetUsedReservation());
-
-  child_reservations1.Close();
-  child_reservations2.Close();
-  child_mem_tracker1.UnregisterFromParent();
-  child_mem_tracker2.UnregisterFromParent();
-}
-
-TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
-  const int HIERARCHY_DEPTH = 5;
-  // Setup a multi-level hierarchy of trackers and ensure that consumption is 
reported
-  // correctly.
-  ReservationTracker reservations[HIERARCHY_DEPTH];
-  scoped_ptr<MemTracker> mem_trackers[HIERARCHY_DEPTH];
-
-  // We can only handle MemTracker limits at the topmost linked 
ReservationTracker,
-  // so avoid adding limits at lower level.
-  const int LIMIT = HIERARCHY_DEPTH;
-  vector<int> mem_limits({LIMIT * 10, LIMIT, -1, -1, -1});
-
-  // Root trackers aren't linked.
-  mem_trackers[0].reset(new MemTracker(mem_limits[0]));
-  reservations[0].InitRootTracker(NewProfile(), 500);
-  for (int i = 1; i < HIERARCHY_DEPTH; ++i) {
-    mem_trackers[i].reset(new MemTracker(
-        mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get()));
-    reservations[i].InitChildTracker(
-        NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500);
-  }
-
-  vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1});
-
-  // Test that all limits and consumption correctly reported when consuming
-  // from a non-root ReservationTracker that is connected to a MemTracker.
-  for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
-    int64_t lowest_limit = mem_trackers[level]->lowest_limit();
-    for (int amount : interesting_amounts) {
-      bool increased = reservations[level].IncreaseReservation(amount);
-      if (lowest_limit == -1 || amount <= lowest_limit) {
-        // The increase should go through.
-        ASSERT_TRUE(increased) << reservations[level].DebugString();
-        ASSERT_EQ(amount, reservations[level].GetReservation());
-        ASSERT_EQ(amount, mem_trackers[level]->consumption());
-        for (int ancestor = 0; ancestor < level; ++ancestor) {
-          ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
-          ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
-        }
-
-        LOG(INFO) << "\n" << mem_trackers[0]->LogUsage();
-        reservations[level].DecreaseReservation(amount);
-      } else {
-        ASSERT_FALSE(increased);
-      }
-      // We should be back in the original state.
-      for (int i = 0; i < HIERARCHY_DEPTH; ++i) {
-        ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
-                                                       << 
reservations[i].DebugString();
-        ASSERT_EQ(0, reservations[i].GetChildReservations());
-        ASSERT_EQ(0, mem_trackers[i]->consumption());
-      }
-    }
-  }
-
-  // "Pull down" a reservation from the top of the hierarchy level-by-level to 
the
-  // leaves, checking that everything is consistent at each step.
-  for (int level = 0; level < HIERARCHY_DEPTH; ++level) {
-    const int amount = LIMIT;
-    ASSERT_TRUE(reservations[level].IncreaseReservation(amount));
-    ASSERT_EQ(amount, reservations[level].GetReservation());
-    ASSERT_EQ(0, reservations[level].GetUsedReservation());
-    if (level != 0) ASSERT_EQ(amount, mem_trackers[level]->consumption());
-    for (int ancestor = 0; ancestor < level; ++ancestor) {
-      ASSERT_EQ(0, reservations[ancestor].GetUsedReservation());
-      ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
-      ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
-    }
-    reservations[level].DecreaseReservation(amount);
-  }
-
-  for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) {
-    reservations[i].Close();
-    if (i != 0) mem_trackers[i]->UnregisterFromParent();
-  }
-}
-}
-
-IMPALA_TEST_MAIN();


Reply via email to