Repository: incubator-impala Updated Branches: refs/heads/master 218019e59 -> f9ac593ff
IMPALA-3201: reservation implementation for new buffer pool This patch implements the reservation bookkeeping logic for the new buffer pool. The reservations are always guaranteed and any buffer/page allocations require a reservation. Reservations are tracked via a hierarchy of ReservationTrackers. Eventually ReservationTrackers should become the main mechanism for memory accounting, once all runtime memory is handled by the buffer pool. In the meantime, query/process limits are enforced and memory is reported through the preexisting MemTracker hierarchy. Reservations are accounted as consumption against a MemTracker because the memory is committed and cannot be used for other purposes. The MemTracker then uses the counters from the ReservationTracker to log information about buffer-pool memory down to the operator level. Testing: Includes basic tests for buffer pool client registration and various reservation operations. Change-Id: I35cc89e863efb4cc506657bfdaaaf633a10bbab6 Reviewed-on: http://gerrit.cloudera.org:8080/3993 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f9ac593f Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f9ac593f Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f9ac593f Branch: refs/heads/master Commit: f9ac593ff9a9ea3a5b329eed618500c45a1e7400 Parents: 218019e Author: Tim Armstrong <[email protected]> Authored: Wed Jun 22 18:24:04 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Sun Sep 11 00:36:12 2016 +0000 ---------------------------------------------------------------------- be/CMakeLists.txt | 5 + be/src/bufferpool/CMakeLists.txt | 31 ++ be/src/bufferpool/buffer-pool-test.cc | 231 +++++++++++ be/src/bufferpool/buffer-pool.cc | 69 ++++ be/src/bufferpool/buffer-pool.h | 34 +- .../bufferpool/reservation-tracker-counters.h | 41 ++ be/src/bufferpool/reservation-tracker-test.cc | 385 +++++++++++++++++++ be/src/bufferpool/reservation-tracker.cc | 308 +++++++++++++++ be/src/bufferpool/reservation-tracker.h | 248 ++++++++++++ be/src/runtime/mem-tracker.cc | 61 ++- be/src/runtime/mem-tracker.h | 12 + be/src/util/dummy-runtime-profile.h | 39 ++ be/src/util/runtime-profile-counters.h | 1 - 13 files changed, 1439 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 6aff0b3..2a46239 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -162,6 +162,7 @@ if (DOXYGEN_FOUND) # Possible to not input the subdirs one by one? set(CMAKE_DOXYGEN_INPUT ${CMAKE_SOURCE_DIR}/be/src + ${CMAKE_SOURCE_DIR}/be/src/bufferpool/ ${CMAKE_SOURCE_DIR}/be/src/catalog/ ${CMAKE_SOURCE_DIR}/be/src/common/ ${CMAKE_SOURCE_DIR}/be/src/exec/ @@ -258,6 +259,7 @@ if (NOT APPLE) endif() set (IMPALA_LINK_LIBS ${WL_START_GROUP} + BufferPool Catalog CodeGen Common @@ -283,6 +285,7 @@ set (IMPALA_LINK_LIBS # libraries when dynamic linking is enabled. if (BUILD_SHARED_LIBS) set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS} + BufferPool Runtime Exec CodeGen @@ -414,6 +417,7 @@ endfunction(COMPILE_TO_IR) add_subdirectory(src/gutil) # compile these subdirs using their own CMakeLists.txt +add_subdirectory(src/bufferpool) add_subdirectory(src/catalog) add_subdirectory(src/codegen) add_subdirectory(src/common) @@ -441,6 +445,7 @@ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-variable") add_subdirectory(generated-sources/gen-cpp) link_directories( + ${CMAKE_CURRENT_SOURCE_DIR}/build/bufferpool ${CMAKE_CURRENT_SOURCE_DIR}/build/catalog ${CMAKE_CURRENT_SOURCE_DIR}/build/common ${CMAKE_CURRENT_SOURCE_DIR}/build/exec http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/CMakeLists.txt b/be/src/bufferpool/CMakeLists.txt new file mode 100644 index 0000000..0930f73 --- /dev/null +++ b/be/src/bufferpool/CMakeLists.txt @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# where to put generated libraries +set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool") + +add_library(BufferPool + buffer-pool.cc + reservation-tracker.cc +) +add_dependencies(BufferPool thrift-deps) + +ADD_BE_TEST(buffer-pool-test) +ADD_BE_TEST(reservation-tracker-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-pool-test.cc b/be/src/bufferpool/buffer-pool-test.cc new file mode 100644 index 0000000..cdb163d --- /dev/null +++ b/be/src/bufferpool/buffer-pool-test.cc @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include <boost/bind.hpp> +#include <boost/scoped_ptr.hpp> +#include <boost/thread/thread.hpp> +#include <boost/unordered_map.hpp> +#include <cstdlib> +#include <string> +#include <vector> + +#include "bufferpool/buffer-pool.h" +#include "bufferpool/reservation-tracker.h" +#include "common/init.h" +#include "common/object-pool.h" +#include "testutil/test-macros.h" + +#include "common/names.h" + +using strings::Substitute; + +namespace impala { + +class BufferPoolTest : public ::testing::Test { + public: + virtual void SetUp() {} + + virtual void TearDown() { + for (auto entry : query_reservations_) { + ReservationTracker* tracker = entry.second; + tracker->Close(); + } + + global_reservations_.Close(); + obj_pool_.Clear(); + } + + /// The minimum buffer size used in most tests. + const static int64_t TEST_PAGE_LEN = 1024; + + /// Test helper to simulate registering then deregistering a number of queries with + /// the given initial reservation and reservation limit. + void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries, + int64_t initial_query_reservation, int64_t query_reservation_limit); + + protected: + static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; } + + /// Helper function to create one reservation tracker per query. + ReservationTracker* GetQueryReservationTracker(int64_t query_id) { + lock_guard<SpinLock> l(query_reservations_lock_); + ReservationTracker* tracker = query_reservations_[query_id]; + if (tracker != NULL) return tracker; + tracker = obj_pool_.Add(new ReservationTracker()); + query_reservations_[query_id] = tracker; + return tracker; + } + + RuntimeProfile* NewProfile() { + return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile")); + } + + ObjectPool obj_pool_; + + ReservationTracker global_reservations_; + + // Map from query_id to the reservation tracker for that query. Reads and modifications + // of the map are protected by query_reservations_lock_. + unordered_map<int64_t, ReservationTracker*> query_reservations_; + SpinLock query_reservations_lock_; +}; + +const int64_t BufferPoolTest::TEST_PAGE_LEN; + +void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, + int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit) { + Status status; + + int clients_per_query = 32; + BufferPool::Client* clients[num_queries]; + ReservationTracker* client_reservations[num_queries]; + + for (int i = 0; i < num_queries; ++i) { + int64_t query_id = QueryId(query_id_hi, i); + + // Initialize a tracker for a new query. + ReservationTracker* query_reservation = GetQueryReservationTracker(query_id); + query_reservation->InitChildTracker( + NULL, &global_reservations_, NULL, query_reservation_limit); + + // Test that closing then reopening child tracker works. + query_reservation->Close(); + query_reservation->InitChildTracker( + NULL, &global_reservations_, NULL, query_reservation_limit); + EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation)); + + clients[i] = new BufferPool::Client[clients_per_query]; + client_reservations[i] = new ReservationTracker[clients_per_query]; + + for (int j = 0; j < clients_per_query; ++j) { + int64_t initial_client_reservation = + initial_query_reservation / clients_per_query + j + < initial_query_reservation % clients_per_query; + // Reservation limit can be anything greater or equal to the initial reservation. + int64_t client_reservation_limit = initial_client_reservation + rand() % 100000; + client_reservations[i][j].InitChildTracker( + NULL, query_reservation, NULL, client_reservation_limit); + EXPECT_TRUE( + client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation)); + string name = Substitute("Client $0 for query $1", j, query_id); + EXPECT_OK(pool->RegisterClient(name, &client_reservations[i][j], &clients[i][j])); + } + + for (int j = 0; j < clients_per_query; ++j) { + ASSERT_TRUE(clients[i][j].is_registered()); + } + } + + // Deregister clients then query. + for (int i = 0; i < num_queries; ++i) { + for (int j = 0; j < clients_per_query; ++j) { + pool->DeregisterClient(&clients[i][j]); + ASSERT_FALSE(clients[i][j].is_registered()); + client_reservations[i][j].Close(); + } + + delete[] clients[i]; + delete[] client_reservations[i]; + + GetQueryReservationTracker(QueryId(query_id_hi, i))->Close(); + } +} + +/// Test that queries and clients can be registered and deregistered with the reservation +/// trackers and the buffer pool. +TEST_F(BufferPoolTest, BasicRegistration) { + int num_concurrent_queries = 1024; + int64_t sum_initial_reservations = 4; + int64_t reservation_limit = 1024; + // Need enough buffers for all initial reservations. + int64_t total_mem = sum_initial_reservations * num_concurrent_queries; + global_reservations_.InitRootTracker(NewProfile(), total_mem); + + BufferPool pool(TEST_PAGE_LEN, total_mem); + + RegisterQueriesAndClients( + &pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit); + + DCHECK_EQ(global_reservations_.GetUsedReservation(), 0); + DCHECK_EQ(global_reservations_.GetChildReservations(), 0); + DCHECK_EQ(global_reservations_.GetReservation(), 0); + global_reservations_.Close(); +} + +/// Test that queries and clients can be registered and deregistered by concurrent +/// threads. +TEST_F(BufferPoolTest, ConcurrentRegistration) { + int queries_per_thread = 64; + int num_threads = 64; + int num_concurrent_queries = queries_per_thread * num_threads; + int64_t sum_initial_reservations = 4; + int64_t reservation_limit = 1024; + // Need enough buffers for all initial reservations. + int64_t total_mem = num_concurrent_queries * sum_initial_reservations; + global_reservations_.InitRootTracker(NewProfile(), total_mem); + + BufferPool pool(TEST_PAGE_LEN, total_mem); + + // Launch threads, each with a different set of query IDs. + thread_group workers; + for (int i = 0; i < num_threads; ++i) { + workers.add_thread(new thread(bind(&BufferPoolTest::RegisterQueriesAndClients, this, + &pool, i, queries_per_thread, sum_initial_reservations, reservation_limit))); + } + workers.join_all(); + + // All the reservations should be released at this point. + DCHECK_EQ(global_reservations_.GetUsedReservation(), 0); + DCHECK_EQ(global_reservations_.GetReservation(), 0); + global_reservations_.Close(); +} + +/// Test that reservation setup fails if the initial buffers cannot be fulfilled. +TEST_F(BufferPoolTest, QueryReservationsUnfulfilled) { + Status status; + int num_queries = 128; + int64_t reservation_per_query = 128; + // Won't be able to fulfill initial reservation for last query. + int64_t total_mem = num_queries * reservation_per_query - 1; + global_reservations_.InitRootTracker(NewProfile(), total_mem); + + for (int i = 0; i < num_queries; ++i) { + ReservationTracker* query_tracker = GetQueryReservationTracker(i); + query_tracker->InitChildTracker( + NewProfile(), &global_reservations_, NULL, 2 * reservation_per_query); + bool got_initial_reservation = + query_tracker->IncreaseReservationToFit(reservation_per_query); + if (i < num_queries - 1) { + ASSERT_TRUE(got_initial_reservation); + } else { + ASSERT_FALSE(got_initial_reservation); + + // Getting the initial reservation should succeed after freeing up buffers from + // other query. + GetQueryReservationTracker(i - 1)->Close(); + ASSERT_TRUE(query_tracker->IncreaseReservationToFit(reservation_per_query)); + } + } +} +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/buffer-pool.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-pool.cc b/be/src/bufferpool/buffer-pool.cc new file mode 100644 index 0000000..21893fc --- /dev/null +++ b/be/src/bufferpool/buffer-pool.cc @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "bufferpool/buffer-pool.h" + +#include <boost/bind.hpp> +#include <limits> +#include <sstream> + +#include "bufferpool/reservation-tracker.h" +#include "common/names.h" +#include "gutil/strings/substitute.h" +#include "util/bit-util.h" +#include "util/uid-util.h" + +using strings::Substitute; + +namespace impala { + +BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit) + : min_buffer_len_(min_buffer_len), buffer_bytes_limit_(buffer_bytes_limit) { + DCHECK_GT(min_buffer_len, 0); + DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len)); +} + +BufferPool::~BufferPool() {} + +Status BufferPool::RegisterClient( + const string& name, ReservationTracker* reservation, Client* client) { + DCHECK(!client->is_registered()); + DCHECK(reservation != NULL); + client->reservation_ = reservation; + client->name_ = name; + return Status::OK(); +} + +void BufferPool::DeregisterClient(Client* client) { + if (!client->is_registered()) return; + client->reservation_->Close(); + client->name_.clear(); + client->reservation_ = NULL; +} + +string BufferPool::Client::DebugString() const { + return Substitute("<BufferPool::Client> $0 name: $1 reservation: $2", this, name_, + reservation_->DebugString()); +} + +string BufferPool::DebugString() { + stringstream ss; + ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_ + << "buffer_bytes_limit: " << buffer_bytes_limit_ << "\n"; + return ss.str(); +} +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/buffer-pool.h b/be/src/bufferpool/buffer-pool.h index 183e538..e566543 100644 --- a/be/src/bufferpool/buffer-pool.h +++ b/be/src/bufferpool/buffer-pool.h @@ -22,7 +22,6 @@ #include <string> #include <stdint.h> -#include "bufferpool/buffer-allocator.h" #include "common/atomic.h" #include "common/status.h" #include "gutil/macros.h" @@ -125,7 +124,7 @@ class ReservationTracker; /// calling Unpin() on the page. The page may then be written to disk and its buffer /// repurposed internally by BufferPool. /// * Once the operator needs the page's contents again and has sufficient unused -/// reservations, it can call Pin(), which brings the page's contents back into memory, +/// reservation, it can call Pin(), which brings the page's contents back into memory, /// perhaps in a different buffer. Therefore the operator must fix up any pointers into /// the previous buffer. /// * If the operator is done with the page, it can call FreeBuffer() to destroy the @@ -157,10 +156,10 @@ class BufferPool { ~BufferPool(); /// Register a client. Returns an error status and does not register the client if the - /// arguments are invalid. 'name' is an arbitrary used to identify the client in any - /// errors messages or logging. 'client' is the client to register. 'client' should not - /// already be registered. - Status RegisterClient(const std::string& name, ReservationTracker* reservations, + /// arguments are invalid. 'name' is an arbitrary name used to identify the client in + /// any errors messages or logging. 'client' is the client to register. 'client' should + /// not already be registered. + Status RegisterClient(const std::string& name, ReservationTracker* reservation, Client* client); /// Deregister 'client' if it is registered. Idempotent. @@ -168,7 +167,7 @@ class BufferPool { /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page length /// supported by BufferPool (see BufferPool class comment). The client must have - /// sufficient unused reservations to pin the new page (otherwise it will DCHECK). + /// sufficient unused reservation to pin the new page (otherwise it will DCHECK). /// CreatePage() only fails when a system error prevents the buffer pool from fulfilling /// the reservation. /// On success, the handle is mapped to the new page. @@ -229,6 +228,13 @@ class BufferPool { private: DISALLOW_COPY_AND_ASSIGN(BufferPool); + + /// The minimum length of a buffer in bytes. All buffers and pages are a multiple of + /// this length. This is always a power of two. + const int64_t min_buffer_len_; + + /// The maximum physical memory in bytes that can be used for buffers. + const int64_t buffer_bytes_limit_; }; /// External representation of a client of the BufferPool. Clients are used for @@ -239,17 +245,25 @@ class BufferPool { /// Client methods or BufferPool methods with the Client as an argument is not supported. class BufferPool::Client { public: - Client() {} + Client() : reservation_(NULL) {} /// Client must be deregistered. ~Client() { DCHECK(!is_registered()); } - bool is_registered() const; - ReservationTracker* reservations(); + bool is_registered() const { return reservation_ != NULL; } + ReservationTracker* reservation() { return reservation_; } std::string DebugString() const; private: + friend class BufferPool; DISALLOW_COPY_AND_ASSIGN(Client); + + /// A name identifying the client. + std::string name_; + + /// The reservation tracker for the client. NULL means the client isn't registered. + /// All pages pinned by the client count as usage against 'reservation_'. + ReservationTracker* reservation_; }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker-counters.h ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/reservation-tracker-counters.h b/be/src/bufferpool/reservation-tracker-counters.h new file mode 100644 index 0000000..0952a2f --- /dev/null +++ b/be/src/bufferpool/reservation-tracker-counters.h @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RESERVATION_TRACKER_COUNTERS_H +#define IMPALA_RESERVATION_TRACKER_COUNTERS_H + +#include "util/runtime-profile.h" + +namespace impala { + +/// A set of counters for each ReservationTracker for reporting purposes. +/// +/// If the ReservationTracker is linked to a profile these have the same lifetime as that +/// profile, otherwise they have the same lifetime as the ReservationTracker itself. +struct ReservationTrackerCounters { + /// The tracker's peak reservation in bytes. + RuntimeProfile::HighWaterMarkCounter* peak_reservation; + + /// The tracker's peak usage in bytes. + RuntimeProfile::HighWaterMarkCounter* peak_used_reservation; + + /// The hard limit on the tracker's reservations + RuntimeProfile::Counter* reservation_limit; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/reservation-tracker-test.cc b/be/src/bufferpool/reservation-tracker-test.cc new file mode 100644 index 0000000..e43efb8 --- /dev/null +++ b/be/src/bufferpool/reservation-tracker-test.cc @@ -0,0 +1,385 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include <limits> +#include <string> +#include <vector> + +#include "bufferpool/reservation-tracker.h" +#include "common/init.h" +#include "common/object-pool.h" +#include "runtime/mem-tracker.h" +#include "testutil/test-macros.h" + +#include "common/names.h" + +using strings::Substitute; + +namespace impala { + +class ReservationTrackerTest : public ::testing::Test { + public: + virtual void SetUp() {} + + virtual void TearDown() { + root_.Close(); + obj_pool_.Clear(); + } + + /// The minimum allocation size used in most tests. + const static int64_t MIN_BUFFER_LEN = 1024; + + protected: + RuntimeProfile* NewProfile() { + return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile")); + } + + ObjectPool obj_pool_; + + ReservationTracker root_; + + scoped_ptr<RuntimeProfile> profile_; +}; + +const int64_t ReservationTrackerTest::MIN_BUFFER_LEN; + +TEST_F(ReservationTrackerTest, BasicSingleTracker) { + const int64_t limit = 16; + root_.InitRootTracker(NULL, limit); + ASSERT_EQ(0, root_.GetReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + + // Fail to increase reservation. + ASSERT_FALSE(root_.IncreaseReservation(limit + 1)); + ASSERT_EQ(0, root_.GetReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(0, root_.GetUnusedReservation()); + + // Successfully increase reservation. + ASSERT_TRUE(root_.IncreaseReservation(limit - 1)); + ASSERT_EQ(limit - 1, root_.GetReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(limit - 1, root_.GetUnusedReservation()); + + // Adjust usage. + root_.AllocateFrom(2); + ASSERT_EQ(limit - 1, root_.GetReservation()); + ASSERT_EQ(2, root_.GetUsedReservation()); + ASSERT_EQ(limit - 3, root_.GetUnusedReservation()); + root_.ReleaseTo(1); + ASSERT_EQ(1, root_.GetUsedReservation()); + root_.ReleaseTo(1); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(limit - 1, root_.GetReservation()); + ASSERT_EQ(limit - 1, root_.GetUnusedReservation()); +} + +TEST_F(ReservationTrackerTest, BasicTwoLevel) { + const int64_t limit = 16; + root_.InitRootTracker(NULL, limit); + + const int64_t root_reservation = limit / 2; + // Get half of the limit as an initial reservation. + ASSERT_TRUE(root_.IncreaseReservation(root_reservation)); + ASSERT_EQ(root_reservation, root_.GetReservation()); + ASSERT_EQ(root_reservation, root_.GetUnusedReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(0, root_.GetChildReservations()); + + ReservationTracker child; + child.InitChildTracker(NULL, &root_, NULL, numeric_limits<int64_t>::max()); + + const int64_t child_reservation = root_reservation + 1; + // Get parent's reservation plus a bit more. + ASSERT_TRUE(child.IncreaseReservation(child_reservation)); + + ASSERT_EQ(child_reservation, root_.GetReservation()); + ASSERT_EQ(0, root_.GetUnusedReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(child_reservation, root_.GetChildReservations()); + + ASSERT_EQ(child_reservation, child.GetReservation()); + ASSERT_EQ(child_reservation, child.GetUnusedReservation()); + ASSERT_EQ(0, child.GetUsedReservation()); + ASSERT_EQ(0, child.GetChildReservations()); + + // Check that child allocation is reflected correctly. + child.AllocateFrom(1); + ASSERT_EQ(child_reservation, child.GetReservation()); + ASSERT_EQ(1, child.GetUsedReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + + // Check that parent reservation increase is reflected correctly. + ASSERT_TRUE(root_.IncreaseReservation(1)); + ASSERT_EQ(child_reservation + 1, root_.GetReservation()); + ASSERT_EQ(1, root_.GetUnusedReservation()); + ASSERT_EQ(0, root_.GetUsedReservation()); + ASSERT_EQ(child_reservation, root_.GetChildReservations()); + ASSERT_EQ(child_reservation, child.GetReservation()); + + // Check that parent allocation is reflected correctly. + root_.AllocateFrom(1); + ASSERT_EQ(child_reservation + 1, root_.GetReservation()); + ASSERT_EQ(0, root_.GetUnusedReservation()); + ASSERT_EQ(1, root_.GetUsedReservation()); + + // Release allocations. + root_.ReleaseTo(1); + ASSERT_EQ(0, root_.GetUsedReservation()); + child.ReleaseTo(1); + ASSERT_EQ(0, child.GetUsedReservation()); + + // Child reservation should be returned all the way up the tree. + child.DecreaseReservation(1); + ASSERT_EQ(child_reservation, root_.GetReservation()); + ASSERT_EQ(child_reservation - 1, child.GetReservation()); + ASSERT_EQ(child_reservation - 1, root_.GetChildReservations()); + + // Closing the child should release its reservation. + child.Close(); + ASSERT_EQ(1, root_.GetReservation()); + ASSERT_EQ(0, root_.GetChildReservations()); +} + +TEST_F(ReservationTrackerTest, CloseIdempotency) { + // Check we can close before opening. + root_.Close(); + + const int64_t limit = 16; + root_.InitRootTracker(NULL, limit); + + // Check we can close twice + root_.Close(); + root_.Close(); +} + +// Test that the tracker's reservation limit is enforced. +TEST_F(ReservationTrackerTest, ReservationLimit) { + Status status; + // Setup trackers so that there is a spare buffer that the client isn't entitled to. + int64_t total_mem = MIN_BUFFER_LEN * 3; + int64_t client_limit = MIN_BUFFER_LEN * 2; + root_.InitRootTracker(NULL, total_mem); + + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + client_tracker->InitChildTracker(NULL, &root_, NULL, client_limit); + + // We can only increase reservation up to the client's limit. + ASSERT_FALSE(client_tracker->IncreaseReservation(client_limit + 1)); + ASSERT_TRUE(client_tracker->IncreaseReservation(client_limit)); + ASSERT_FALSE(client_tracker->IncreaseReservation(1)); + + client_tracker->Close(); +} + +// Test that parent's reservation limit is enforced. +TEST_F(ReservationTrackerTest, ParentReservationLimit) { + Status status; + // Setup reservations so that there is a spare buffer. + int64_t total_mem = MIN_BUFFER_LEN * 4; + int64_t parent_limit = MIN_BUFFER_LEN * 3; + int64_t other_client_reservation = MIN_BUFFER_LEN; + root_.InitRootTracker(NULL, total_mem); + + // The child reservation limit is higher than the parent reservation limit, so the + // parent limit is the effective limit. + ReservationTracker* query_tracker = obj_pool_.Add(new ReservationTracker()); + ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker()); + query_tracker->InitChildTracker(NULL, &root_, NULL, parent_limit); + client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem * 10); + + ReservationTracker* other_client_tracker = obj_pool_.Add(new ReservationTracker()); + other_client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem); + ASSERT_TRUE(other_client_tracker->IncreaseReservationToFit(other_client_reservation)); + ASSERT_EQ(root_.GetUsedReservation(), 0); + ASSERT_EQ(root_.GetChildReservations(), other_client_reservation); + ASSERT_EQ(query_tracker->GetUsedReservation(), 0); + ASSERT_EQ(query_tracker->GetUnusedReservation(), 0); + + // Can only increase reservation up to parent limit, excluding other reservations. + int64_t effective_limit = parent_limit - other_client_reservation; + ASSERT_FALSE(client_tracker->IncreaseReservation(effective_limit + MIN_BUFFER_LEN)); + ASSERT_TRUE(client_tracker->IncreaseReservation(effective_limit)); + ASSERT_FALSE(client_tracker->IncreaseReservation(MIN_BUFFER_LEN)); + + // Check that tracker hierarchy reports correct usage. + ASSERT_EQ(root_.GetUsedReservation(), 0); + ASSERT_EQ(root_.GetChildReservations(), parent_limit); + ASSERT_EQ(query_tracker->GetUsedReservation(), 0); + ASSERT_EQ(query_tracker->GetUnusedReservation(), 0); + + client_tracker->Close(); + other_client_tracker->Close(); + query_tracker->Close(); +} + +/// Test integration of ReservationTracker with MemTracker. +TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) { + // Setup a 2-level hierarchy of trackers. The child ReservationTracker is linked to + // the child MemTracker. We add various limits at different places to enable testing + // of different code paths. + root_.InitRootTracker(NewProfile(), MIN_BUFFER_LEN * 100); + MemTracker root_mem_tracker; + MemTracker child_mem_tracker1(-1, -1, "Child 1", &root_mem_tracker); + MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, -1, "Child 2", &root_mem_tracker); + ReservationTracker child_reservations1, child_reservations2; + child_reservations1.InitChildTracker( + NewProfile(), &root_, &child_mem_tracker1, 500 * MIN_BUFFER_LEN); + child_reservations2.InitChildTracker( + NewProfile(), &root_, &child_mem_tracker2, 75 * MIN_BUFFER_LEN); + + // Check that a single buffer reservation is accounted correctly. + ASSERT_TRUE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN)); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); + ASSERT_EQ(MIN_BUFFER_LEN, root_mem_tracker.consumption()); + ASSERT_EQ(MIN_BUFFER_LEN, root_.GetChildReservations()); + + // Check that a buffer reservation from the other child is accounted correctly. + ASSERT_TRUE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN)); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); + + // Check that hitting the MemTracker limit leaves things in a consistent state. + ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 50)); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); + + // Check that hitting the ReservationTracker's local limit leaves things in a + // consistent state. + ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 75)); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); + + // Check that hitting the ReservationTracker's parent's limit after the + // MemTracker consumption is incremented leaves things in a consistent state. + ASSERT_FALSE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN * 100)); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption()); + ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation()); + ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption()); + ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations()); + + // Check that released memory is decremented from all trackers correctly. + child_reservations1.DecreaseReservation(MIN_BUFFER_LEN); + child_reservations2.DecreaseReservation(MIN_BUFFER_LEN); + ASSERT_EQ(0, child_reservations2.GetReservation()); + ASSERT_EQ(0, child_mem_tracker2.consumption()); + ASSERT_EQ(0, root_mem_tracker.consumption()); + ASSERT_EQ(0, root_.GetUsedReservation()); + + child_reservations1.Close(); + child_reservations2.Close(); + child_mem_tracker1.UnregisterFromParent(); + child_mem_tracker2.UnregisterFromParent(); +} + +TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) { + const int HIERARCHY_DEPTH = 5; + // Setup a multi-level hierarchy of trackers and ensure that consumption is reported + // correctly. + ReservationTracker reservations[HIERARCHY_DEPTH]; + scoped_ptr<MemTracker> mem_trackers[HIERARCHY_DEPTH]; + + // We can only handle MemTracker limits at the topmost linked ReservationTracker, + // so avoid adding limits at lower level. + const int LIMIT = HIERARCHY_DEPTH; + vector<int> mem_limits({LIMIT * 10, LIMIT, -1, -1, -1}); + + // Root trackers aren't linked. + mem_trackers[0].reset(new MemTracker(mem_limits[0])); + reservations[0].InitRootTracker(NewProfile(), 500); + for (int i = 1; i < HIERARCHY_DEPTH; ++i) { + mem_trackers[i].reset(new MemTracker( + mem_limits[i], -1, Substitute("Child $0", i), mem_trackers[i - 1].get())); + reservations[i].InitChildTracker( + NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500); + } + + vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1}); + + // Test that all limits and consumption correctly reported when consuming + // from a non-root ReservationTracker that is connected to a MemTracker. + for (int level = 1; level < HIERARCHY_DEPTH; ++level) { + int64_t lowest_limit = mem_trackers[level]->lowest_limit(); + for (int amount : interesting_amounts) { + bool increased = reservations[level].IncreaseReservation(amount); + if (lowest_limit == -1 || amount <= lowest_limit) { + // The increase should go through. + ASSERT_TRUE(increased) << reservations[level].DebugString(); + ASSERT_EQ(amount, reservations[level].GetReservation()); + ASSERT_EQ(amount, mem_trackers[level]->consumption()); + for (int ancestor = 0; ancestor < level; ++ancestor) { + ASSERT_EQ(amount, reservations[ancestor].GetChildReservations()); + ASSERT_EQ(amount, mem_trackers[ancestor]->consumption()); + } + + LOG(INFO) << "\n" << mem_trackers[0]->LogUsage(); + reservations[level].DecreaseReservation(amount); + } else { + ASSERT_FALSE(increased); + } + // We should be back in the original state. + for (int i = 0; i < HIERARCHY_DEPTH; ++i) { + ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": " + << reservations[i].DebugString(); + ASSERT_EQ(0, reservations[i].GetChildReservations()); + ASSERT_EQ(0, mem_trackers[i]->consumption()); + } + } + } + + // "Pull down" a reservation from the top of the hierarchy level-by-level to the + // leaves, checking that everything is consistent at each step. + for (int level = 0; level < HIERARCHY_DEPTH; ++level) { + const int amount = LIMIT; + ASSERT_TRUE(reservations[level].IncreaseReservation(amount)); + ASSERT_EQ(amount, reservations[level].GetReservation()); + ASSERT_EQ(0, reservations[level].GetUsedReservation()); + if (level != 0) ASSERT_EQ(amount, mem_trackers[level]->consumption()); + for (int ancestor = 0; ancestor < level; ++ancestor) { + ASSERT_EQ(0, reservations[ancestor].GetUsedReservation()); + ASSERT_EQ(amount, reservations[ancestor].GetChildReservations()); + ASSERT_EQ(amount, mem_trackers[ancestor]->consumption()); + } + reservations[level].DecreaseReservation(amount); + } + + for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) { + reservations[i].Close(); + if (i != 0) mem_trackers[i]->UnregisterFromParent(); + } +} +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/reservation-tracker.cc b/be/src/bufferpool/reservation-tracker.cc new file mode 100644 index 0000000..1c5a14e --- /dev/null +++ b/be/src/bufferpool/reservation-tracker.cc @@ -0,0 +1,308 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "bufferpool/reservation-tracker.h" + +#include <algorithm> + +#include "common/object-pool.h" +#include "gutil/strings/substitute.h" +#include "runtime/mem-tracker.h" +#include "util/dummy-runtime-profile.h" +#include "util/runtime-profile-counters.h" + +#include "common/names.h" + +using strings::Substitute; + +namespace impala { + +ReservationTracker::ReservationTracker() : initialized_(false), mem_tracker_(NULL) {} + +ReservationTracker::~ReservationTracker() { + DCHECK(!initialized_); +} + +void ReservationTracker::InitRootTracker( + RuntimeProfile* profile, int64_t reservation_limit) { + lock_guard<SpinLock> l(lock_); + DCHECK(!initialized_); + parent_ = NULL; + mem_tracker_ = NULL; + reservation_limit_ = reservation_limit; + reservation_ = 0; + used_reservation_ = 0; + child_reservations_ = 0; + initialized_ = true; + + InitCounters(profile, reservation_limit_); + COUNTER_SET(counters_.peak_reservation, reservation_); + + CheckConsistency(); +} + +void ReservationTracker::InitChildTracker(RuntimeProfile* profile, + ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit) { + DCHECK(parent != NULL); + DCHECK_GE(reservation_limit, 0); + + lock_guard<SpinLock> l(lock_); + DCHECK(!initialized_); + parent_ = parent; + mem_tracker_ = mem_tracker; + + reservation_limit_ = reservation_limit; + reservation_ = 0; + used_reservation_ = 0; + child_reservations_ = 0; + initialized_ = true; + + if (mem_tracker_ != NULL) { + MemTracker* parent_mem_tracker = GetParentMemTracker(); + if (parent_mem_tracker != NULL) { + // Make sure the parent links of the MemTrackers correspond to our parent links. + DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent()); + // Make sure we don't have a lower limit than the ancestor, since we don't enforce + // limits at lower links. + DCHECK_EQ(mem_tracker_->lowest_limit(), parent_mem_tracker->lowest_limit()); + } else { + // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent + // shouldn't have a MemTracker. + ReservationTracker* ancestor = parent_; + while (ancestor != NULL) { + DCHECK(ancestor->mem_tracker_ == NULL); + ancestor = ancestor->parent_; + } + } + } + + InitCounters(profile, reservation_limit_); + + CheckConsistency(); +} + +void ReservationTracker::InitCounters( + RuntimeProfile* profile, int64_t reservation_limit) { + bool profile_provided = profile != NULL; + if (profile == NULL) { + dummy_profile_.reset(new DummyProfile); + profile = dummy_profile_->profile(); + } + + // Check that another tracker's counters aren't already registered in the profile. + DCHECK(profile->GetCounter("BufferPoolInitialReservation") == NULL); + counters_.reservation_limit = + ADD_COUNTER(profile, "BufferPoolReservationLimit", TUnit::BYTES); + counters_.peak_reservation = + profile->AddHighWaterMarkCounter("BufferPoolPeakReservation", TUnit::BYTES); + counters_.peak_used_reservation = + profile->AddHighWaterMarkCounter("BufferPoolPeakUsedReservation", TUnit::BYTES); + + COUNTER_SET(counters_.reservation_limit, reservation_limit); + + if (mem_tracker_ != NULL && profile_provided) { + mem_tracker_->EnableReservationReporting(counters_); + } +} + +void ReservationTracker::Close() { + lock_guard<SpinLock> l(lock_); + if (!initialized_) return; + CheckConsistency(); + DCHECK_EQ(used_reservation_, 0); + DCHECK_EQ(child_reservations_, 0); + // Release any reservation to parent. + if (parent_ != NULL) DecreaseReservationInternalLocked(reservation_, false); + mem_tracker_ = NULL; + parent_ = NULL; + initialized_ = false; +} + +bool ReservationTracker::IncreaseReservation(int64_t bytes) { + lock_guard<SpinLock> l(lock_); + return IncreaseReservationInternalLocked(bytes, false, false); +} + +bool ReservationTracker::IncreaseReservationToFit(int64_t bytes) { + lock_guard<SpinLock> l(lock_); + return IncreaseReservationInternalLocked(bytes, true, false); +} + +bool ReservationTracker::IncreaseReservationInternalLocked( + int64_t bytes, bool use_existing_reservation, bool is_child_reservation) { + DCHECK(initialized_); + int64_t reservation_increase = + use_existing_reservation ? max<int64_t>(0, bytes - unused_reservation()) : bytes; + DCHECK_GE(reservation_increase, 0); + + bool granted; + // Check if the increase is allowed, starting at the bottom of hierarchy. + if (reservation_ + reservation_increase > reservation_limit_) { + granted = false; + } else if (reservation_increase == 0) { + granted = true; + } else { + if (parent_ == NULL) { + granted = true; + } else { + lock_guard<SpinLock> l(parent_->lock_); + granted = + parent_->IncreaseReservationInternalLocked(reservation_increase, true, true); + } + if (granted && !TryUpdateMemTracker(reservation_increase)) { + granted = false; + // Roll back changes to ancestors if MemTracker update fails. + parent_->DecreaseReservationInternal(reservation_increase, true); + } + } + + if (granted) { + // The reservation was granted and state updated in all ancestors: we can modify + // this tracker's state now. + UpdateReservation(reservation_increase); + if (is_child_reservation) child_reservations_ += bytes; + } + + CheckConsistency(); + return granted; +} + +bool ReservationTracker::TryUpdateMemTracker(int64_t reservation_increase) { + if (mem_tracker_ == NULL) return true; + if (GetParentMemTracker() == NULL) { + // At the topmost link, which may be a MemTracker with a limit, we need to use + // TryConsume() to check the limit. + return mem_tracker_->TryConsume(reservation_increase); + } else { + // For lower links, there shouldn't be a limit to enforce, so we just need to + // update the consumption of the linked MemTracker since the reservation is + // already reflected in its parent. + mem_tracker_->ConsumeLocal(reservation_increase, GetParentMemTracker()); + return true; + } +} + +void ReservationTracker::DecreaseReservation(int64_t bytes) { + DecreaseReservationInternal(bytes, false); +} + +void ReservationTracker::DecreaseReservationInternal( + int64_t bytes, bool is_child_reservation) { + lock_guard<SpinLock> l(lock_); + DecreaseReservationInternalLocked(bytes, is_child_reservation); +} + +void ReservationTracker::DecreaseReservationInternalLocked( + int64_t bytes, bool is_child_reservation) { + DCHECK(initialized_); + DCHECK_GE(reservation_, bytes); + if (bytes == 0) return; + if (is_child_reservation) child_reservations_ -= bytes; + UpdateReservation(-bytes); + // The reservation should be returned up the tree. + if (mem_tracker_ != NULL) { + if (GetParentMemTracker() == NULL) { + mem_tracker_->Release(bytes); + } else { + mem_tracker_->ReleaseLocal(bytes, GetParentMemTracker()); + } + } + if (parent_ != NULL) parent_->DecreaseReservationInternal(bytes, true); + CheckConsistency(); +} + +void ReservationTracker::AllocateFrom(int64_t bytes) { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + DCHECK_GE(bytes, 0); + DCHECK_LE(bytes, unused_reservation()); + UpdateUsedReservation(bytes); + CheckConsistency(); +} + +void ReservationTracker::ReleaseTo(int64_t bytes) { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + DCHECK_GE(bytes, 0); + DCHECK_LE(bytes, used_reservation_); + UpdateUsedReservation(-bytes); + CheckConsistency(); +} + +int64_t ReservationTracker::GetReservation() { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + return reservation_; +} + +int64_t ReservationTracker::GetUsedReservation() { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + return used_reservation_; +} + +int64_t ReservationTracker::GetUnusedReservation() { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + return unused_reservation(); +} + +int64_t ReservationTracker::GetChildReservations() { + lock_guard<SpinLock> l(lock_); + DCHECK(initialized_); + return child_reservations_; +} + +void ReservationTracker::CheckConsistency() const { + // Check internal invariants. + DCHECK_GE(reservation_, 0); + DCHECK_LE(reservation_, reservation_limit_); + DCHECK_GE(child_reservations_, 0); + DCHECK_GE(used_reservation_, 0); + DCHECK_LE(used_reservation_ + child_reservations_, reservation_); + + DCHECK_EQ(reservation_, counters_.peak_reservation->current_value()); + DCHECK_LE(reservation_, counters_.peak_reservation->value()); + DCHECK_EQ(used_reservation_, counters_.peak_used_reservation->current_value()); + DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value()); + DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value()); +} + +void ReservationTracker::UpdateUsedReservation(int64_t delta) { + used_reservation_ += delta; + COUNTER_SET(counters_.peak_used_reservation, used_reservation_); + CheckConsistency(); +} + +void ReservationTracker::UpdateReservation(int64_t delta) { + reservation_ += delta; + COUNTER_SET(counters_.peak_reservation, reservation_); + CheckConsistency(); +} + +string ReservationTracker::DebugString() { + lock_guard<SpinLock> l(lock_); + if (!initialized_) return "<ReservationTracker>: uninitialized"; + + string parent_debug_string = parent_ == NULL ? "NULL" : parent_->DebugString(); + return Substitute( + "<ReservationTracker>: reservation_limit $0 reservation $1 used_reservation $2 " + "child_reservations $3 parent:\n$4", + reservation_limit_, reservation_, used_reservation_, child_reservations_, + parent_debug_string); +} +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/reservation-tracker.h b/be/src/bufferpool/reservation-tracker.h new file mode 100644 index 0000000..6bdecf0 --- /dev/null +++ b/be/src/bufferpool/reservation-tracker.h @@ -0,0 +1,248 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RESERVATION_TRACKER_H +#define IMPALA_RESERVATION_TRACKER_H + +#include <stdint.h> +#include <boost/scoped_ptr.hpp> +#include <boost/thread/locks.hpp> +#include <string> + +#include "bufferpool/reservation-tracker-counters.h" +#include "common/status.h" +#include "util/spinlock.h" + +namespace impala { + +class DummyProfile; +class MemTracker; +class RuntimeProfile; + +/// A tracker for a hierarchy of buffer pool memory reservations, denominated in bytes. +/// A hierarchy of ReservationTrackers provides a mechanism for subdividing buffer pool +/// memory and enforcing upper and lower bounds on memory usage. +/// +/// The root of the tracker tree enforces a global maximum, which is distributed among its +/// children. Each tracker in the tree has a 'reservation': the total bytes of buffer pool +/// memory it is entitled to use. The reservation is inclusive of any memory that is +/// already allocated from the reservation, i.e. using a reservation to allocate memory +/// does not subtract from the reservation. +/// +/// A reservation can be used directly at the tracker by calling AllocateFrom(), or +/// distributed to children of the tracker for the childrens' reservations. Each tracker +/// in the tree can use up to its reservation without checking parent trackers. To +/// increase its reservation, a tracker must use some of its parent's reservation (and +/// perhaps increase reservations all the way to the root of the tree). +/// +/// Each tracker also has a maximum reservation that is enforced. E.g. if the root of the +/// tracker hierarchy is the global tracker for the Impala daemon and the next level of +/// the hierarchy is made up of per-query trackers, then the maximum reservation +/// mechanism can enforce both process-level and query-level limits on reservations. +/// +/// Invariants: +/// * A tracker's reservation is at most its reservation limit: reservation <= limit +/// * A tracker's reservation is at least the sum of its childrens' reservations plus +/// the amount of the reservation used directly at this tracker. The difference is +/// the unused reservation: +/// child_reservations + used_reservation + unused_reservation = reservation. +/// +/// Thread-safety: +/// All public ReservationTracker methods are thread-safe. If multiple threads +/// concurrently invoke methods on a ReservationTracker, each operation is applied +/// atomically to leave the ReservationTracker in a consistent state. Calling threads +/// are responsible for coordinating to avoid violating any method preconditions, +/// e.g. ensuring that there is sufficient unused reservation before calling AllocateTo(). +/// +/// Integration with MemTracker hierarchy: +/// TODO: we will remove MemTracker and this integration once all memory is accounted via +/// reservations. +/// +/// Each ReservationTracker can optionally have a linked MemTracker. E.g. an exec +/// node's ReservationTracker can be linked with the exec node's MemTracker, so that +/// reservations are included in query memory consumption for the purposes of enforcing +/// memory limits, reporting and logging. The reservation is accounted as consumption +/// against the linked MemTracker and its ancestors because reserved memory is committed. +/// Allocating from a reservation therefore does not change the consumption reflected in +/// the MemTracker hierarchy. +/// +/// MemTracker limits are only checked via the topmost link (i.e. the query-level +/// trackers): we require that no MemTrackers below this level have limits. +/// +/// We require that the MemTracker hierarchy is consistent with the ReservationTracker +/// hierarchy. I.e. if a ReservationTracker is linked to a MemTracker "A", and its parent +/// is linked to a MemTracker "B", then "B" must be the parent of "A"'. +class ReservationTracker { + public: + ReservationTracker(); + virtual ~ReservationTracker(); + + /// Initializes the root tracker with the given reservation limit in bytes. The initial + /// reservation is 0. + /// if 'profile' is not NULL, the counters defined in ReservationTrackerCounters are + /// added to 'profile'. + void InitRootTracker(RuntimeProfile* profile, int64_t reservation_limit); + + /// Initializes a new ReservationTracker with a parent. + /// If 'mem_tracker' is not NULL, reservations for this ReservationTracker and its + /// children will be counted as consumption against 'mem_tracker'. + /// 'reservation_limit' is the maximum reservation for this tracker in bytes. + /// if 'profile' is not NULL, the counters in 'counters_' are added to 'profile'. + void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent, + MemTracker* mem_tracker, int64_t reservation_limit); + + /// If the tracker is initialized, deregister the ReservationTracker from its parent, + /// relinquishing all this tracker's reservation. All of the reservation must be unused + /// and all the tracker's children must be closed before calling this method. + void Close(); + + /// Request to increase reservation by 'bytes'. The request is either granted in + /// full or not at all. Uses any unused reservation on ancestors and increase + /// ancestors' reservations if needed to fit the increased reservation. + /// Returns true if the reservation increase is granted, or false if not granted. + /// If the reservation is not granted, no modifications are made to the state of + /// any ReservationTrackers. + bool IncreaseReservation(int64_t bytes); + + /// Tries to ensure that 'bytes' of unused reservation is available. If not already + /// available, tries to increase the reservation such that the unused reservation is + /// exactly equal to 'bytes'. Uses any unused reservation on ancestors and increase + /// ancestors' reservations if needed to fit the increased reservation. + /// Returns true if the reservation increase was successful or not necessary. + bool IncreaseReservationToFit(int64_t bytes); + + /// Decrease tracker's reservation by 'bytes'. This tracker's reservation must be at + /// least 'bytes' before calling this method. + /// TODO: decide on and implement policy for how far to release the reservation up + /// the tree. Currently the reservation is released all the way to the root. + void DecreaseReservation(int64_t bytes); + + /// Allocate 'bytes' from the reservation. The tracker must have at least 'bytes' + /// unused reservation before calling this method. + void AllocateFrom(int64_t bytes); + + /// Release 'bytes' of previously allocated memory. The used reservation is + /// decreased by 'bytes'. Before the call, the used reservation must be at least + /// 'bytes' before calling this method. + void ReleaseTo(int64_t bytes); + + /// Returns the amount of the reservation in bytes. + int64_t GetReservation(); + + /// Returns the current amount of the reservation used at this tracker, not including + /// reservations of children in bytes. + int64_t GetUsedReservation(); + + /// Returns the amount of the reservation neither used nor given to childrens' + /// reservations at this tracker in bytes. + int64_t GetUnusedReservation(); + + /// Returns the total reservations of children in bytes. + int64_t GetChildReservations(); + + std::string DebugString(); + + private: + /// Returns the amount of 'reservation_' that is unused. + inline int64_t unused_reservation() const { + return reservation_ - used_reservation_ - child_reservations_; + } + + /// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL otherwise. + MemTracker* GetParentMemTracker() const { + return parent_ == NULL ? NULL : parent_->mem_tracker_; + } + + /// Initializes 'counters_', storing the counters in 'profile'. + /// If 'profile' is NULL, creates a dummy profile to store the counters. + void InitCounters(RuntimeProfile* profile, int64_t max_reservation); + + /// Internal helper for IncreaseReservation(). If 'use_existing_reservation' is true, + /// increase by the minimum amount so that 'bytes' fits in the reservation, otherwise + /// just increase by 'bytes'. If 'is_child_reservation' is true, also increase + /// 'child_reservations_' by 'bytes'. + /// 'lock_' must be held by caller. + bool IncreaseReservationInternalLocked( + int64_t bytes, bool use_existing_reservation, bool is_child_reservation); + + /// Update consumption on linked MemTracker. For the topmost link, return false if + /// this failed because it would exceed a memory limit. If there is no linked + /// MemTracker, just returns true. + /// TODO: remove once we account all memory via ReservationTrackers. + bool TryUpdateMemTracker(int64_t reservation_increase); + + /// Internal helper for DecreaseReservation(). This behaves the same as + /// DecreaseReservation(), except when 'is_child_reservation' is true it decreases + /// 'child_reservations_' by 'bytes'. + void DecreaseReservationInternal(int64_t bytes, bool is_child_reservation); + + /// Same as DecreaseReservationInternal(), but 'lock_' must be held by caller. + void DecreaseReservationInternalLocked(int64_t bytes, bool is_child_reservation); + + /// Check the internal consistency of the ReservationTracker and DCHECKs if in an + /// inconsistent state. + /// 'lock_' must be held by caller. + void CheckConsistency() const; + + /// Increase or decrease 'used_reservation_' and update profile counters accordingly. + /// 'lock_' must be held by caller. + void UpdateUsedReservation(int64_t delta); + + /// Increase or decrease 'reservation_' and update profile counters accordingly. + /// 'lock_' must be held by caller. + void UpdateReservation(int64_t delta); + + /// lock_ protects all members. In a hierarchy of trackers, locks must be acquired + /// from the bottom-up. + SpinLock lock_; + + /// True if the tracker is initialized. + bool initialized_; + + /// A dummy profile to hold the counters in 'counters_' in the case that no profile + /// is provided. + boost::scoped_ptr<DummyProfile> dummy_profile_; + + /// The RuntimeProfile counters for this tracker. + /// All non-NULL if 'initialized_' is true. + ReservationTrackerCounters counters_; + + /// The parent of this tracker in the hierarchy. Does not change after initialization. + ReservationTracker* parent_; + + /// If non-NULL, reservations are counted as memory consumption against this tracker. + /// Does not change after initialization. Not owned. + /// TODO: remove once all memory is accounted via ReservationTrackers. + MemTracker* mem_tracker_; + + /// The maximum reservation in bytes that this tracker can have. + int64_t reservation_limit_; + + /// This tracker's current reservation in bytes. 'reservation_' <= 'reservation_limit_'. + int64_t reservation_; + + /// Total reservation of children in bytes. This is included in 'reservation_'. + /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. + int64_t child_reservations_; + + /// The amount of the reservation currently used by this tracker in bytes. + /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. + int64_t used_reservation_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/runtime/mem-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc index e1251f3..a9ceb76 100644 --- a/be/src/runtime/mem-tracker.cc +++ b/be/src/runtime/mem-tracker.cc @@ -22,9 +22,10 @@ #include <gperftools/malloc_extension.h> #include <gutil/strings/substitute.h> +#include "bufferpool/reservation-tracker-counters.h" +#include "resourcebroker/resource-broker.h" #include "runtime/exec-env.h" #include "runtime/runtime-state.h" -#include "resourcebroker/resource-broker.h" #include "scheduling/query-resource-mgr.h" #include "util/debug-util.h" #include "util/mem-info.h" @@ -134,6 +135,11 @@ void MemTracker::UnregisterFromParent() { child_tracker_it_ = parent_->child_trackers_.end(); } +void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& counters) { + reservation_counters_.reset(new ReservationTrackerCounters); + *reservation_counters_ = counters; +} + int64_t MemTracker::GetPoolMemReserved() const { // Pool trackers should have a pool_name_ and no limit. DCHECK(!pool_name_.empty()); @@ -205,8 +211,7 @@ shared_ptr<MemTracker> MemTracker::GetQueryMemTracker( // First time this id registered, make a new object. Give a shared ptr to // the caller and put a weak ptr in the map. shared_ptr<MemTracker> tracker = make_shared<MemTracker>(byte_limit, - rm_reserved_limit, Substitute("Query($0) Limit", lexical_cast<string>(id)), - parent); + rm_reserved_limit, Substitute("Query($0)", lexical_cast<string>(id)), parent); tracker->auto_unregister_ = true; tracker->query_id_ = id; request_to_mem_trackers_[id] = tracker; @@ -246,17 +251,26 @@ void MemTracker::RefreshConsumptionFromMetric() { } // Calling this on the query tracker results in output like: -// Query Limit: memory limit exceeded. Limit=100.00 MB Consumption=106.19 MB -// Fragment 5b45e83bbc2d92bd:d3ff8a7df7a2f491: Consumption=52.00 KB -// AGGREGATION_NODE (id=6): Consumption=44.00 KB -// EXCHANGE_NODE (id=5): Consumption=0.00 -// DataStreamMgr: Consumption=0.00 -// Fragment 5b45e83bbc2d92bd:d3ff8a7df7a2f492: Consumption=100.00 KB -// AGGREGATION_NODE (id=2): Consumption=36.00 KB -// AGGREGATION_NODE (id=4): Consumption=40.00 KB -// EXCHANGE_NODE (id=3): Consumption=0.00 -// DataStreamMgr: Consumption=0.00 -// DataStreamSender: Consumption=16.00 KB +// +// Query(4a4c81fedaed337d:4acadfda00000000) Limit=10.00 GB Total=508.28 MB Peak=508.45 MB +// Fragment 4a4c81fedaed337d:4acadfda00000000: Total=8.00 KB Peak=8.00 KB +// EXCHANGE_NODE (id=4): Total=0 Peak=0 +// DataStreamRecvr: Total=0 Peak=0 +// Block Manager: Limit=6.68 GB Total=394.00 MB Peak=394.00 MB +// Fragment 4a4c81fedaed337d:4acadfda00000006: Total=233.72 MB Peak=242.24 MB +// AGGREGATION_NODE (id=1): Total=139.21 MB Peak=139.84 MB +// HDFS_SCAN_NODE (id=0): Total=93.94 MB Peak=102.24 MB +// DataStreamSender (dst_id=2): Total=45.99 KB Peak=85.99 KB +// Fragment 4a4c81fedaed337d:4acadfda00000003: Total=274.55 MB Peak=274.62 MB +// AGGREGATION_NODE (id=3): Total=274.50 MB Peak=274.50 MB +// EXCHANGE_NODE (id=2): Total=0 Peak=0 +// DataStreamRecvr: Total=45.91 KB Peak=684.07 KB +// DataStreamSender (dst_id=4): Total=680.00 B Peak=680.00 B +// +// If 'reservation_metrics_' are set, we ge a more granular breakdown: +// TrackerName: Limit=5.00 MB BufferPoolUsed/Reservation=0/5.00 MB OtherMemory=1.04 MB +// Total=6.04 MB Peak=6.45 MB +// string MemTracker::LogUsage(const string& prefix) const { if (!log_usage_if_zero_ && consumption() == 0) return ""; @@ -267,7 +281,24 @@ string MemTracker::LogUsage(const string& prefix) const { if (rm_reserved_limit_ > 0) { ss << " RM Limit=" << PrettyPrinter::Print(rm_reserved_limit_, TUnit::BYTES); } - ss << " Consumption=" << PrettyPrinter::Print(consumption(), TUnit::BYTES); + + int64_t total = consumption(); + int64_t peak = consumption_->value(); + if (reservation_counters_ != NULL) { + int64_t reservation = reservation_counters_->peak_reservation->current_value(); + int64_t used_reservation = + reservation_counters_->peak_used_reservation->current_value(); + int64_t reservation_limit = reservation_counters_->reservation_limit->value(); + ss << " BufferPoolUsed/Reservation=" + << PrettyPrinter::Print(used_reservation, TUnit::BYTES) << "/" + << PrettyPrinter::Print(reservation, TUnit::BYTES); + if (reservation_limit != numeric_limits<int64_t>::max()) { + ss << " BufferPoolLimit=" << PrettyPrinter::Print(reservation_limit, TUnit::BYTES); + } + ss << " OtherMemory=" << PrettyPrinter::Print(total - reservation, TUnit::BYTES); + } + ss << " Total=" << PrettyPrinter::Print(total, TUnit::BYTES) + << " Peak=" << PrettyPrinter::Print(peak, TUnit::BYTES); stringstream prefix_ss; prefix_ss << prefix << " "; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index 6c13af6..e3548cc 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -37,6 +37,7 @@ namespace impala { +class ReservationTrackerCounters; class MemTracker; class QueryResourceMgr; @@ -85,6 +86,10 @@ class MemTracker { /// Removes this tracker from parent_->child_trackers_. void UnregisterFromParent(); + /// Include counters from a ReservationTracker in logs and other diagnostics. + /// The counters should be owned by the fragment's RuntimeProfile. + void EnableReservationReporting(const ReservationTrackerCounters& counters); + /// Returns a MemTracker object for query 'id'. Calling this with the same id will /// return the same MemTracker object. An example of how this is used is to pass it /// the same query id for all fragments of that query running on this machine. This @@ -331,6 +336,8 @@ class MemTracker { void RegisterMetrics(MetricGroup* metrics, const std::string& prefix); /// Logs the usage of this tracker and all of its children (recursively). + /// TODO: once all memory is accounted in ReservationTracker hierarchy, move + /// reporting there. std::string LogUsage(const std::string& prefix = "") const; /// Log the memory usage when memory limit is exceeded and return a status object with @@ -437,6 +444,11 @@ class MemTracker { /// NULL if consumption_metric_ is set. UIntGauge* consumption_metric_; + /// If non-NULL, counters from a corresponding ReservationTracker that should be + /// reported in logs and other diagnostics. The counters are owned by the fragment's + /// RuntimeProfile. + boost::scoped_ptr<ReservationTrackerCounters> reservation_counters_; + std::vector<MemTracker*> all_trackers_; // this tracker plus all of its ancestors std::vector<MemTracker*> limit_trackers_; // all_trackers_ with valid limits http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/util/dummy-runtime-profile.h ---------------------------------------------------------------------- diff --git a/be/src/util/dummy-runtime-profile.h b/be/src/util/dummy-runtime-profile.h new file mode 100644 index 0000000..83bccbf --- /dev/null +++ b/be/src/util/dummy-runtime-profile.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_UTIL_DEBUG_RUNTIME_PROFILE_H +#define IMPALA_UTIL_DEBUG_RUNTIME_PROFILE_H + +#include "common/object-pool.h" +#include "util/runtime-profile.h" + +namespace impala { + +/// Utility class to create a self-contained profile that does not report anything. +/// This is useful if there is sometimes a RuntimeProfile associated with an object +/// but not always so that the object can still allocate counters in the same way. +class DummyProfile { + public: + DummyProfile() : pool_(), profile_(&pool_, "dummy", false) {} + RuntimeProfile* profile() { return &profile_; } + + private: + ObjectPool pool_; + RuntimeProfile profile_; +}; +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/util/runtime-profile-counters.h ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h index 7d92861..77d7938 100644 --- a/be/src/util/runtime-profile-counters.h +++ b/be/src/util/runtime-profile-counters.h @@ -485,7 +485,6 @@ class ThreadCounterMeasurement { MonotonicStopWatch sw_; RuntimeProfile::ThreadCounters* counters_; }; - } #endif
