http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/kernel_stack_watchdog.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/kernel_stack_watchdog.h b/be/src/kudu/util/kernel_stack_watchdog.h new file mode 100644 index 0000000..79b6087 --- /dev/null +++ b/be/src/kudu/util/kernel_stack_watchdog.h @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This class defines a singleton thread which manages a map of other thread IDs to +// watch. Before performing some operation which may stall (eg IO) or which we expect +// should be short (e.g. a callback on a critical thread that should not block), threads +// may mark themselves as "watched", with a threshold beyond which they would like +// warnings to be emitted including their stack trace at that time. +// +// In the background, a separate watchdog thread periodically wakes up, and if a thread +// has been marked longer than its provided threshold, it will dump the stack trace +// of that thread (both kernel-mode and user-mode stacks). +// +// This can be useful for diagnosing I/O stalls coming from the kernel, for example. +// +// Users will typically use the macro SCOPED_WATCH_STACK. Example usage: +// +// // We expect the Write() to return in <100ms. If it takes longer than that +// // we'll see warnings indicating why it is stalled. +// { +// SCOPED_WATCH_STACK(100); +// file->Write(...); +// } +// +// If the Write call takes too long, a stack trace will be logged at WARNING level. +// Note that the threshold time parameter is not a guarantee that a stall will be +// caught by the watchdog thread. The watchdog only wakes up periodically to look +// for threads that have been stalled too long. For example, if the threshold is 10ms +// and the thread blocks for only 20ms, it's quite likely that the watchdog will +// have missed the event. +// +// The SCOPED_WATCH_STACK macro is designed to have minimal overhead: approximately +// equivalent to a clock_gettime() and a single 'mfence' instruction. Micro-benchmarks +// measure the cost at about 50ns per call. Thus, it may safely be used in hot code +// paths. +// +// Scopes with SCOPED_WATCH_STACK may be nested, but only up to a hard-coded limited depth +// (currently 8). +#ifndef KUDU_UTIL_KERNEL_STACK_WATCHDOG_H +#define KUDU_UTIL_KERNEL_STACK_WATCHDOG_H + +#include <string> +#include <unordered_map> +#include <vector> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/singleton.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" +#include "kudu/util/threadlocal.h" + +#define SCOPED_WATCH_STACK(threshold_ms) \ + ScopedWatchKernelStack _stack_watcher(__FILE__ ":" AS_STRING(__LINE__), threshold_ms) + +namespace kudu { + +class Thread; + +// Singleton thread which implements the watchdog. +class KernelStackWatchdog { + public: + static KernelStackWatchdog* GetInstance() { + return Singleton<KernelStackWatchdog>::get(); + } + + // Instead of logging through glog, log warning messages into a vector. + // + // If 'save_logs' is true, will start saving to the vector, and forget any + // previously logged messages. + // If 'save_logs' is false, disables this functionality. + void SaveLogsForTests(bool save_logs); + + // Return any log messages saved since the last call to SaveLogsForTests(true). + std::vector<std::string> LoggedMessagesForTests() const; + + private: + friend class Singleton<KernelStackWatchdog>; + friend class ScopedWatchKernelStack; + + // The thread-local state which captures whether a thread should be watched by + // the watchdog. This structure is constructed as a thread-local on first use + // and destructed when the thread exits. Upon construction, the TLS structure + // registers itself with the WatchDog, and on destruction, unregisters itself. + // + // See 'seq_lock_' below for details on thread-safe operation. + struct TLS { + TLS(); + ~TLS(); + + enum Constants { + // The maximum nesting depth of SCOPED_WATCH_STACK() macros. + kMaxDepth = 8 + }; + + // Because we support nested SCOPED_WATCH_STACK() macros, we need to capture + // multiple active frames within the TLS. + struct Frame { + // The time at which this frame entered the SCOPED_WATCH_STACK section. + // We use MicrosecondsInt64 instead of MonoTime because it inlines a bit + // better. + MicrosecondsInt64 start_time_; + // The threshold of time beyond which the watchdog should emit warnings. + int threshold_ms_; + // A string explaining the state that the thread is in (typically a file:line + // string). This is expected to be static storage and is not freed. + const char* status_; + }; + + // The data within the TLS. This is a POD type so that the watchdog can easily + // copy data out of a thread's TLS. + struct Data { + Frame frames_[kMaxDepth]; + Atomic32 depth_; + + // Counter implementing a simple "sequence lock". + // + // Before modifying any data inside its TLS, the watched thread increments this value so it is + // odd. When the modifications are complete, it increments it again, making it even. + // + // To read the TLS data from a target thread, the watchdog thread waits for the value + // to become even, indicating that no write is in progress. Then, it does a potentially + // racy copy of the entire 'Data' structure. Then, it validates the value again. + // If it is has not changed, then the snapshot is guaranteed to be consistent. + // + // We use this type of locking to ensure that the watched thread is as fast as possible, + // allowing us to use SCOPED_WATCH_STACK even in hot code paths. In particular, + // the watched thread is wait-free, since it doesn't need to loop or retry. In addition, the + // memory is only written by that thread, eliminating any cache-line bouncing. The watchdog + // thread may have to loop multiple times to see a consistent snapshot, but we're OK delaying + // the watchdog arbitrarily since it isn't on any critical path. + Atomic32 seq_lock_; + + // Take a consistent snapshot of this data into 'dst'. This may block if the target thread + // is currently modifying its TLS. + void SnapshotCopy(Data* dst) const; + }; + Data data_; + }; + + KernelStackWatchdog(); + ~KernelStackWatchdog(); + + // Get or create the TLS for the current thread. + static TLS* GetTLS(); + + // Register a new thread's TLS with the watchdog. + // Called by any thread the first time it enters a watched section, when its TLS + // is constructed. + void Register(TLS* tls); + + // Called when a thread's TLS is destructed (i.e. when the thread exits). + void Unregister(); + + // The actual watchdog loop that the watchdog thread runs. + void RunThread(); + + DECLARE_STATIC_THREAD_LOCAL(TLS, tls_); + + typedef std::unordered_map<pid_t, TLS*> TLSMap; + TLSMap tls_by_tid_; + + // If non-NULL, warnings will be emitted into this vector instead of glog. + // Used by tests. + gscoped_ptr<std::vector<std::string> > log_collector_; + + // Lock protecting log_collector_. + mutable simple_spinlock log_lock_; + + // Lock protecting tls_by_tid_. + mutable simple_spinlock tls_lock_; + + // Lock which prevents threads from unregistering while the watchdog + // sends signals. + // + // This is used to prevent the watchdog from sending a signal to a pid just + // after the pid has actually exited and been reused. Sending a signal to + // a non-Kudu thread could have unintended consequences. + // + // When this lock is held concurrently with 'tls_lock_' or 'log_lock_', + // this lock must be acquired first. + Mutex unregister_lock_; + + // The watchdog thread itself. + scoped_refptr<Thread> thread_; + + // Signal to stop the watchdog. + CountDownLatch finish_; + + DISALLOW_COPY_AND_ASSIGN(KernelStackWatchdog); +}; + +// Scoped object which marks the current thread for watching. +class ScopedWatchKernelStack { + public: + // If the current scope is active more than 'threshold_ms' milliseconds, the + // watchdog thread will log a warning including the message 'label'. 'label' + // is not copied or freed. + ScopedWatchKernelStack(const char* label, int threshold_ms) { + if (threshold_ms <= 0) return; + + // Rather than just using the lazy GetTLS() method, we'll first try to load + // the TLS ourselves. This is usually successful, and avoids us having to inline + // the TLS construction path at call sites. + KernelStackWatchdog::TLS* tls = KernelStackWatchdog::tls_; + if (PREDICT_FALSE(tls == NULL)) { + tls = KernelStackWatchdog::GetTLS(); + } + KernelStackWatchdog::TLS::Data* tls_data = &tls->data_; + + // "Acquire" the sequence lock. While the lock value is odd, readers will block. + // TODO: technically this barrier is stronger than we need: we are the only writer + // to this data, so it's OK to allow loads from within the critical section to + // reorder above this next line. All we need is a "StoreStore" barrier (i.e. + // prevent any stores in the critical section from getting reordered above the + // increment of the counter). However, atomicops.h doesn't provide such a barrier + // as of yet, so we'll do the slightly more expensive one for now. + base::subtle::Acquire_Store(&tls_data->seq_lock_, tls_data->seq_lock_ + 1); + + KernelStackWatchdog::TLS::Frame* frame = &tls_data->frames_[tls_data->depth_++]; + DCHECK_LE(tls_data->depth_, KernelStackWatchdog::TLS::kMaxDepth); + frame->start_time_ = GetMonoTimeMicros(); + frame->threshold_ms_ = threshold_ms; + frame->status_ = label; + + // "Release" the sequence lock. This resets the lock value to be even, so readers + // will proceed. + base::subtle::Release_Store(&tls_data->seq_lock_, tls_data->seq_lock_ + 1); + } + + ~ScopedWatchKernelStack() { + if (!KernelStackWatchdog::tls_) return; + + KernelStackWatchdog::TLS::Data* tls = &KernelStackWatchdog::tls_->data_; + int d = tls->depth_; + DCHECK_GT(d, 0); + + // We don't bother with a lock/unlock, because the change we're making here is atomic. + // If we race with the watchdog, either they'll see the old depth_ or the new depth_, + // but in either case the underlying data is perfectly valid. + base::subtle::NoBarrier_Store(&tls->depth_, d - 1); + } + + private: + DISALLOW_COPY_AND_ASSIGN(ScopedWatchKernelStack); +}; + +} // namespace kudu +#endif /* KUDU_UTIL_KERNEL_STACK_WATCHDOG_H */
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/knapsack_solver-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/knapsack_solver-test.cc b/be/src/kudu/util/knapsack_solver-test.cc new file mode 100644 index 0000000..4d95ada --- /dev/null +++ b/be/src/kudu/util/knapsack_solver-test.cc @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <string> +#include <vector> + +#include "kudu/util/knapsack_solver.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_util.h" + +using std::vector; + +namespace kudu { + +class TestKnapsack : public KuduTest { +}; + +// A simple test item for use with the knapsack solver. +// The real code will be solving knapsack over RowSet objects -- +// using simple value/weight pairs in the tests makes it standalone. +struct TestItem { + TestItem(double v, int w) + : value(v), weight(w) { + } + + double value; + int weight; +}; + +// A traits class to adapt the knapsack solver to TestItem. +struct TestItemTraits { + typedef TestItem item_type; + typedef double value_type; + static int get_weight(const TestItem &item) { + return item.weight; + } + static value_type get_value(const TestItem &item) { + return item.value; + } +}; + +// Generate random items into the provided vector. +static void GenerateRandomItems(int n_items, int max_weight, + vector<TestItem> *out) { + for (int i = 0; i < n_items; i++) { + double value = 10000.0 / (random() % 10000 + 1); + int weight = random() % max_weight; + out->push_back(TestItem(value, weight)); + } +} + +// Join and stringify the given list of ints. +static string JoinInts(const vector<int> &ints) { + string ret; + for (int i = 0; i < ints.size(); i++) { + if (i > 0) { + ret.push_back(','); + } + ret.append(std::to_string(ints[i])); + } + return ret; +} + +TEST_F(TestKnapsack, Basics) { + KnapsackSolver<TestItemTraits> solver; + + vector<TestItem> in; + in.push_back(TestItem(500, 3)); + in.push_back(TestItem(110, 1)); + in.push_back(TestItem(125, 1)); + in.push_back(TestItem(100, 1)); + + vector<int> out; + double max_val; + + // For 1 weight, pick item 2 + solver.Solve(in, 1, &out, &max_val); + ASSERT_DOUBLE_EQ(125, max_val); + ASSERT_EQ("2", JoinInts(out)); + out.clear(); + + // For 2 weight, pick item 1, 2 + solver.Solve(in, 2, &out, &max_val); + ASSERT_DOUBLE_EQ(110 + 125, max_val); + ASSERT_EQ("2,1", JoinInts(out)); + out.clear(); + + // For 3 weight, pick item 0 + solver.Solve(in, 3, &out, &max_val); + ASSERT_DOUBLE_EQ(500, max_val); + ASSERT_EQ("0", JoinInts(out)); + out.clear(); + + // For 10 weight, pick all. + solver.Solve(in, 10, &out, &max_val); + ASSERT_DOUBLE_EQ(500 + 110 + 125 + 100, max_val); + ASSERT_EQ("3,2,1,0", JoinInts(out)); + out.clear(); +} + +// Test which generates random knapsack instances and verifies +// that the result satisfies the constraints. +TEST_F(TestKnapsack, Randomized) { + SeedRandom(); + KnapsackSolver<TestItemTraits> solver; + + const int kNumTrials = AllowSlowTests() ? 200 : 1; + const int kMaxWeight = 1000; + const int kNumItems = 1000; + + for (int i = 0; i < kNumTrials; i++) { + vector<TestItem> in; + vector<int> out; + GenerateRandomItems(kNumItems, kMaxWeight, &in); + double max_val; + int max_weight = random() % kMaxWeight; + solver.Solve(in, max_weight, &out, &max_val); + + // Verify that the max_val is equal to the sum of the chosen items' values. + double sum_val = 0; + int sum_weight = 0; + for (int i : out) { + sum_val += in[i].value; + sum_weight += in[i].weight; + } + ASSERT_NEAR(max_val, sum_val, 0.000001); + ASSERT_LE(sum_weight, max_weight); + } +} + +#ifdef NDEBUG +TEST_F(TestKnapsack, Benchmark) { + KnapsackSolver<TestItemTraits> solver; + + const int kNumTrials = 1000; + const int kMaxWeight = 1000; + const int kNumItems = 1000; + + vector<TestItem> in; + GenerateRandomItems(kNumItems, kMaxWeight, &in); + + LOG_TIMING(INFO, "benchmark") { + vector<int> out; + for (int i = 0; i < kNumTrials; i++) { + out.clear(); + double max_val; + solver.Solve(in, random() % kMaxWeight, &out, &max_val); + } + } +} +#endif + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/knapsack_solver.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/knapsack_solver.h b/be/src/kudu/util/knapsack_solver.h new file mode 100644 index 0000000..2c37065 --- /dev/null +++ b/be/src/kudu/util/knapsack_solver.h @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_KNAPSACK_SOLVER_H +#define KUDU_UTIL_KNAPSACK_SOLVER_H + +#include <glog/logging.h> +#include <algorithm> +#include <utility> +#include <vector> +#include "kudu/gutil/macros.h" + +namespace kudu { + +// Solver for the 0-1 knapsack problem. This uses dynamic programming +// to solve the problem exactly. +// +// Given a knapsack capacity of 'W' and a number of potential items 'n', +// this solver is O(nW) time and space. +// +// This implementation is cribbed from wikipedia. The only interesting +// bit here that doesn't directly match the pseudo-code is that we +// maintain the "taken" bitmap keeping track of which items were +// taken, so we can efficiently "trace back" the chosen items. +template<class Traits> +class KnapsackSolver { + public: + typedef typename Traits::item_type item_type; + typedef typename Traits::value_type value_type; + typedef std::pair<int, value_type> solution_type; + + KnapsackSolver() {} + ~KnapsackSolver() {} + + // Solve a knapsack problem in one shot. Finds the set of + // items in 'items' such that their weights add up to no + // more than 'knapsack_capacity' and maximizes the sum + // of their values. + // The indexes of the chosen items are stored in 'chosen_items', + // and the maximal value is stored in 'optimal_value'. + void Solve(std::vector<item_type> &items, + int knapsack_capacity, + std::vector<int>* chosen_items, + value_type* optimal_value); + + + // The following functions are a more advanced API for solving + // knapsack problems, allowing the caller to obtain incremental + // results as each item is considered. See the implementation of + // Solve() for usage. + + // Prepare to solve a knapsack problem with the given capacity and + // item set. The vector of items must remain valid and unchanged + // until the next call to Reset(). + void Reset(int knapsack_capacity, + const std::vector<item_type>* items); + + // Process the next item in 'items'. Returns false if there + // were no more items to process. + bool ProcessNext(); + + // Returns the current best solution after the most recent ProcessNext + // call. *solution is a pair of (knapsack weight used, value obtained). + solution_type GetSolution(); + + // Trace the path of item indexes used to achieve the given best + // solution as of the latest ProcessNext() call. + void TracePath(const solution_type& best, + std::vector<int>* chosen_items); + + private: + + // The state kept by the DP algorithm. + class KnapsackBlackboard { + public: + typedef std::pair<int, value_type> solution_type; + KnapsackBlackboard() : + n_items_(0), + n_weights_(0), + cur_item_idx_(0), + best_solution_(0, 0) { + } + + void ResizeAndClear(int n_items, int max_weight); + + // Current maximum value at the given weight + value_type &max_at(int weight) { + DCHECK_GE(weight, 0); + DCHECK_LT(weight, n_weights_); + return max_value_[weight]; + } + + // Consider the next item to be put into the knapsack + // Moves the "state" of the solution forward + void Advance(value_type new_val, int new_wt); + + // How many items have been considered + int current_item_index() const { return cur_item_idx_; } + + bool item_taken(int item, int weight) const { + DCHECK_GE(weight, 0); + DCHECK_LT(weight, n_weights_); + DCHECK_GE(item, 0); + DCHECK_LT(item, n_items_); + return item_taken_[index(item, weight)]; + } + + solution_type best_solution() { return best_solution_; } + + bool done() { return cur_item_idx_ == n_items_; } + + private: + void MarkTaken(int item, int weight) { + item_taken_[index(item, weight)] = true; + } + + // If the dynamic programming matrix has more than this number of cells, + // then warn. + static const int kWarnDimension = 10000000; + + int index(int item, int weight) const { + return n_weights_ * item + weight; + } + + // vector with maximum value at the i-th position meaning that it is + // the maximum value you can get given a knapsack of weight capacity i + // while only considering items 0..cur_item_idx_-1 + std::vector<value_type> max_value_; + std::vector<bool> item_taken_; // TODO: record difference vectors? + int n_items_, n_weights_; + int cur_item_idx_; + // Best current solution + solution_type best_solution_; + + DISALLOW_COPY_AND_ASSIGN(KnapsackBlackboard); + }; + + KnapsackBlackboard bb_; + const std::vector<item_type>* items_; + int knapsack_capacity_; + + DISALLOW_COPY_AND_ASSIGN(KnapsackSolver); +}; + +template<class Traits> +inline void KnapsackSolver<Traits>::Reset(int knapsack_capacity, + const std::vector<item_type>* items) { + DCHECK_GE(knapsack_capacity, 0); + items_ = items; + knapsack_capacity_ = knapsack_capacity; + bb_.ResizeAndClear(items->size(), knapsack_capacity); +} + +template<class Traits> +inline bool KnapsackSolver<Traits>::ProcessNext() { + if (bb_.done()) return false; + + const item_type& item = (*items_)[bb_.current_item_index()]; + int item_weight = Traits::get_weight(item); + value_type item_value = Traits::get_value(item); + bb_.Advance(item_value, item_weight); + + return true; +} + +template<class Traits> +inline void KnapsackSolver<Traits>::Solve(std::vector<item_type> &items, + int knapsack_capacity, + std::vector<int>* chosen_items, + value_type* optimal_value) { + Reset(knapsack_capacity, &items); + + while (ProcessNext()) { + } + + solution_type best = GetSolution(); + *optimal_value = best.second; + TracePath(best, chosen_items); +} + +template<class Traits> +inline typename KnapsackSolver<Traits>::solution_type KnapsackSolver<Traits>::GetSolution() { + return bb_.best_solution(); +} + +template<class Traits> +inline void KnapsackSolver<Traits>::TracePath(const solution_type& best, + std::vector<int>* chosen_items) { + chosen_items->clear(); + // Retrace back which set of items corresponded to this value. + int w = best.first; + chosen_items->clear(); + for (int k = bb_.current_item_index() - 1; k >= 0; k--) { + if (bb_.item_taken(k, w)) { + const item_type& taken = (*items_)[k]; + chosen_items->push_back(k); + w -= Traits::get_weight(taken); + DCHECK_GE(w, 0); + } + } +} + +template<class Traits> +void KnapsackSolver<Traits>::KnapsackBlackboard::ResizeAndClear(int n_items, + int max_weight) { + CHECK_GT(n_items, 0); + CHECK_GE(max_weight, 0); + + // Rather than zero-indexing the weights, we size the array from + // 0 to max_weight. This avoids having to subtract 1 every time + // we index into the array. + n_weights_ = max_weight + 1; + max_value_.resize(n_weights_); + + int dimension = index(n_items, n_weights_); + if (dimension > kWarnDimension) { + LOG(WARNING) << "Knapsack problem " << n_items << "x" << n_weights_ + << " is large: may be inefficient!"; + } + item_taken_.resize(dimension); + n_items_ = n_items; + + // Clear + std::fill(max_value_.begin(), max_value_.end(), 0); + std::fill(item_taken_.begin(), item_taken_.end(), false); + best_solution_ = std::make_pair(0, 0); + + cur_item_idx_ = 0; +} + +template<class Traits> +void KnapsackSolver<Traits>::KnapsackBlackboard::Advance(value_type new_val, int new_wt) { + // Use the dynamic programming formula: + // Define mv(i, j) as maximum value considering items 0..i-1 with knapsack weight j + // Then: + // if j - weight(i) >= 0, then: + // mv(i, j) = max(mv(i-1, j), mv(i-1, j-weight(i)) + value(j)) + // else mv(i, j) = mv(i-1, j) + // Since the recursive formula requires an access of j-weight(i), we go in reverse. + for (int j = n_weights_ - 1; j >= new_wt ; --j) { + value_type val_if_taken = max_value_[j - new_wt] + new_val; + if (max_value_[j] < val_if_taken) { + max_value_[j] = val_if_taken; + MarkTaken(cur_item_idx_, j); + // Check if new solution found + if (best_solution_.second < val_if_taken) { + best_solution_ = std::make_pair(j, val_if_taken); + } + } + } + + cur_item_idx_++; +} + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/locks.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/locks.cc b/be/src/kudu/util/locks.cc new file mode 100644 index 0000000..bcb7201 --- /dev/null +++ b/be/src/kudu/util/locks.cc @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/locks.h" + +#include "kudu/util/malloc.h" + +namespace kudu { + +size_t percpu_rwlock::memory_footprint_excluding_this() const { + // Because locks_ is a dynamic array of non-trivially-destructable types, + // the returned pointer from new[] isn't guaranteed to point at the start of + // a memory block, rendering it useless for malloc_usable_size(). + // + // Rather than replace locks_ with a vector or something equivalent, we'll + // just measure the memory footprint using sizeof(), with the understanding + // that we might be inaccurate due to malloc "slop". + // + // See https://code.google.com/p/address-sanitizer/issues/detail?id=395 for + // more details. + return n_cpus_ * sizeof(padded_lock); +} + +size_t percpu_rwlock::memory_footprint_including_this() const { + return kudu_malloc_usable_size(this) + memory_footprint_excluding_this(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/locks.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/locks.h b/be/src/kudu/util/locks.h new file mode 100644 index 0000000..36d6984 --- /dev/null +++ b/be/src/kudu/util/locks.h @@ -0,0 +1,285 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_LOCKS_H +#define KUDU_UTIL_LOCKS_H + +#include <algorithm> +#include <glog/logging.h> +#include <mutex> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/dynamic_annotations.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/spinlock.h" +#include "kudu/gutil/sysinfo.h" +#include "kudu/util/rw_semaphore.h" + +namespace kudu { + +using base::subtle::Acquire_CompareAndSwap; +using base::subtle::NoBarrier_Load; +using base::subtle::Release_Store; + +// Wrapper around the Google SpinLock class to adapt it to the method names +// expected by Boost. +class simple_spinlock { + public: + simple_spinlock() {} + + void lock() { + l_.Lock(); + } + + void unlock() { + l_.Unlock(); + } + + bool try_lock() { + return l_.TryLock(); + } + + // Return whether the lock is currently held. + // + // This state can change at any instant, so this is only really useful + // for assertions where you expect to hold the lock. The success of + // such an assertion isn't a guarantee that the current thread is the + // holder, but the failure of such an assertion _is_ a guarantee that + // the current thread is _not_ holding the lock! + bool is_locked() { + return l_.IsHeld(); + } + + private: + base::SpinLock l_; + + DISALLOW_COPY_AND_ASSIGN(simple_spinlock); +}; + +struct padded_spinlock : public simple_spinlock { + char padding[CACHELINE_SIZE - (sizeof(simple_spinlock) % CACHELINE_SIZE)]; +}; + +// Reader-writer lock. +// This is functionally equivalent to rw_semaphore in rw_semaphore.h, but should be +// used whenever the lock is expected to only be acquired on a single thread. +// It adds TSAN annotations which will detect misuse of the lock, but those +// annotations also assume that the same thread the takes the lock will unlock it. +// +// See rw_semaphore.h for documentation on the individual methods where unclear. +class rw_spinlock { + public: + rw_spinlock() { + ANNOTATE_RWLOCK_CREATE(this); + } + ~rw_spinlock() { + ANNOTATE_RWLOCK_DESTROY(this); + } + + void lock_shared() { + sem_.lock_shared(); + ANNOTATE_RWLOCK_ACQUIRED(this, 0); + } + + void unlock_shared() { + ANNOTATE_RWLOCK_RELEASED(this, 0); + sem_.unlock_shared(); + } + + bool try_lock() { + bool ret = sem_.try_lock(); + if (ret) { + ANNOTATE_RWLOCK_ACQUIRED(this, 1); + } + return ret; + } + + void lock() { + sem_.lock(); + ANNOTATE_RWLOCK_ACQUIRED(this, 1); + } + + void unlock() { + ANNOTATE_RWLOCK_RELEASED(this, 1); + sem_.unlock(); + } + + bool is_write_locked() const { + return sem_.is_write_locked(); + } + + bool is_locked() const { + return sem_.is_locked(); + } + + private: + rw_semaphore sem_; +}; + +// A reader-writer lock implementation which is biased for use cases where +// the write lock is taken infrequently, but the read lock is used often. +// +// Internally, this creates N underlying mutexes, one per CPU. When a thread +// wants to lock in read (shared) mode, it locks only its own CPU's mutex. When it +// wants to lock in write (exclusive) mode, it locks all CPU's mutexes. +// +// This means that in the read-mostly case, different readers will not cause any +// cacheline contention. +// +// Usage: +// percpu_rwlock mylock; +// +// // Lock shared: +// { +// boost::shared_lock<rw_spinlock> lock(mylock.get_lock()); +// ... +// } +// +// // Lock exclusive: +// +// { +// boost::lock_guard<percpu_rwlock> lock(mylock); +// ... +// } +class percpu_rwlock { + public: + percpu_rwlock() { +#if defined(__APPLE__) || defined(THREAD_SANITIZER) + // OSX doesn't have a way to get the index of the CPU running this thread, so + // we'll just use a single lock. + // + // TSAN limits the number of simultaneous lock acquisitions to 64, so we + // can't create one lock per core on machines with lots of cores. So, we'll + // also just use a single lock. + n_cpus_ = 1; +#else + n_cpus_ = base::MaxCPUIndex() + 1; +#endif + CHECK_GT(n_cpus_, 0); + locks_ = new padded_lock[n_cpus_]; + } + + ~percpu_rwlock() { + delete [] locks_; + } + + rw_spinlock &get_lock() { +#if defined(__APPLE__) || defined(THREAD_SANITIZER) + int cpu = 0; +#else + int cpu = sched_getcpu(); + CHECK_LT(cpu, n_cpus_); +#endif // defined(__APPLE__) + return locks_[cpu].lock; + } + + bool try_lock() { + for (int i = 0; i < n_cpus_; i++) { + if (!locks_[i].lock.try_lock()) { + while (i--) { + locks_[i].lock.unlock(); + } + return false; + } + } + return true; + } + + // Return true if this lock is held on any CPU. + // See simple_spinlock::is_locked() for details about where this is useful. + bool is_locked() const { + for (int i = 0; i < n_cpus_; i++) { + if (locks_[i].lock.is_locked()) return true; + } + return false; + } + + void lock() { + for (int i = 0; i < n_cpus_; i++) { + locks_[i].lock.lock(); + } + } + + void unlock() { + for (int i = 0; i < n_cpus_; i++) { + locks_[i].lock.unlock(); + } + } + + // Returns the memory usage of this object without the object itself. Should + // be used when embedded inside another object. + size_t memory_footprint_excluding_this() const; + + // Returns the memory usage of this object including the object itself. + // Should be used when allocated on the heap. + size_t memory_footprint_including_this() const; + + private: + struct padded_lock { + rw_spinlock lock; + char padding[CACHELINE_SIZE - (sizeof(rw_spinlock) % CACHELINE_SIZE)]; + }; + + int n_cpus_; + padded_lock *locks_; +}; + +// Simple implementation of the std::shared_lock API, which is not available in +// the standard library until C++14. Defers error checking to the underlying +// mutex. + +template <typename Mutex> +class shared_lock { + public: + shared_lock() + : m_(nullptr) { + } + + explicit shared_lock(Mutex& m) + : m_(&m) { + m_->lock_shared(); + } + + shared_lock(Mutex& m, std::try_to_lock_t t) + : m_(nullptr) { + if (m.try_lock_shared()) { + m_ = &m; + } + } + + bool owns_lock() const { + return m_; + } + + void swap(shared_lock& other) { + std::swap(m_,other.m_); + } + + ~shared_lock() { + if (m_ != nullptr) { + m_->unlock_shared(); + } + } + + private: + Mutex* m_; + DISALLOW_COPY_AND_ASSIGN(shared_lock<Mutex>); +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/logging-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/logging-test.cc b/be/src/kudu/util/logging-test.cc new file mode 100644 index 0000000..e3a0771 --- /dev/null +++ b/be/src/kudu/util/logging-test.cc @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <atomic> +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <string> +#include <thread> +#include <vector> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/async_logger.h" +#include "kudu/util/barrier.h" +#include "kudu/util/locks.h" +#include "kudu/util/logging.h" +#include "kudu/util/logging_test_util.h" +#include "kudu/util/monotime.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_util.h" + +using std::string; +using std::vector; + +namespace kudu { + +// Test the KLOG_EVERY_N_SECS(...) macro. +TEST(LoggingTest, TestThrottledLogging) { + StringVectorSink sink; + ScopedRegisterSink srs(&sink); + + for (int i = 0; i < 10000; i++) { + KLOG_EVERY_N_SECS(INFO, 1) << "test" << THROTTLE_MSG; + SleepFor(MonoDelta::FromMilliseconds(1)); + if (sink.logged_msgs().size() >= 2) break; + } + const vector<string>& msgs = sink.logged_msgs(); + ASSERT_GE(msgs.size(), 2); + + // The first log line shouldn't have a suppression count. + EXPECT_THAT(msgs[0], testing::ContainsRegex("test$")); + // The second one should have suppressed at least three digits worth of log messages. + EXPECT_THAT(msgs[1], testing::ContainsRegex("\\[suppressed [0-9]{3,} similar messages\\]")); +} + +TEST(LoggingTest, TestAdvancedThrottling) { + StringVectorSink sink; + ScopedRegisterSink srs(&sink); + + logging::LogThrottler throttle_a; + + // First, log only using a single tag and throttler. + for (int i = 0; i < 100000; i++) { + KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_a") << "test" << THROTTLE_MSG; + SleepFor(MonoDelta::FromMilliseconds(1)); + if (sink.logged_msgs().size() >= 2) break; + } + auto& msgs = sink.logged_msgs(); + ASSERT_GE(msgs.size(), 2); + + // The first log line shouldn't have a suppression count. + EXPECT_THAT(msgs[0], testing::ContainsRegex("test$")); + // The second one should have suppressed at least three digits worth of log messages. + EXPECT_THAT(msgs[1], testing::ContainsRegex("\\[suppressed [0-9]{3,} similar messages\\]")); + msgs.clear(); + + // Now, try logging using two different tags in rapid succession. This should not + // throttle, because the tag is switching. + KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG; + KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG; + KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_c") << "test c" << THROTTLE_MSG; + KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG; + ASSERT_EQ(msgs.size(), 3); + EXPECT_THAT(msgs[0], testing::ContainsRegex("test b$")); + EXPECT_THAT(msgs[1], testing::ContainsRegex("test c$")); + EXPECT_THAT(msgs[2], testing::ContainsRegex("test b$")); +} + +// Test Logger implementation that just counts the number of messages +// and flushes. +// +// This is purposefully thread-unsafe because we expect that the +// AsyncLogger is only accessing the underlying logger from a single +// thhread. +class CountingLogger : public google::base::Logger { + public: + void Write(bool force_flush, + time_t /*timestamp*/, + const char* /*message*/, + int /*message_len*/) override { + message_count_++; + if (force_flush) { + Flush(); + } + } + + void Flush() override { + // Simulate a slow disk. + SleepFor(MonoDelta::FromMilliseconds(5)); + flush_count_++; + } + + uint32 LogSize() override { + return 0; + } + + std::atomic<int> flush_count_ = {0}; + std::atomic<int> message_count_ = {0}; +}; + +TEST(LoggingTest, TestAsyncLogger) { + const int kNumThreads = 4; + const int kNumMessages = 10000; + const int kBuffer = 10000; + CountingLogger base; + AsyncLogger async(&base, kBuffer); + async.Start(); + + vector<std::thread> threads; + Barrier go_barrier(kNumThreads + 1); + // Start some threads writing log messages. + for (int i = 0; i < kNumThreads; i++) { + threads.emplace_back([&]() { + go_barrier.Wait(); + for (int m = 0; m < kNumMessages; m++) { + async.Write(true, m, "x", 1); + } + }); + } + + // And a thread calling Flush(). + threads.emplace_back([&]() { + go_barrier.Wait(); + for (int i = 0; i < 10; i++) { + async.Flush(); + SleepFor(MonoDelta::FromMilliseconds(3)); + } + }); + + for (auto& t : threads) { + t.join(); + } + async.Stop(); + ASSERT_EQ(base.message_count_, kNumMessages * kNumThreads); + // The async logger should only flush once per "batch" rather than + // once per message, even though we wrote every message with + // 'flush' set to true. + ASSERT_LT(base.flush_count_, kNumMessages * kNumThreads); + ASSERT_GT(async.app_threads_blocked_count_for_tests(), 0); +} + +TEST(LoggingTest, TestAsyncLoggerAutoFlush) { + const int kBuffer = 10000; + CountingLogger base; + AsyncLogger async(&base, kBuffer); + + FLAGS_logbufsecs = 1; + async.Start(); + + // Write some log messages with non-force_flush types. + async.Write(false, 0, "test-x", 1); + async.Write(false, 1, "test-y", 1); + + // The flush wait timeout might take a little bit of time to run. + ASSERT_EVENTUALLY([&]() { + ASSERT_EQ(base.message_count_, 2); + // The AsyncLogger should have flushed at least once by the timer automatically + // so there should be no more messages in the buffer. + ASSERT_GT(base.flush_count_, 0); + }); + async.Stop(); +} + +// Basic test that the redaction utilities work as expected. +TEST(LoggingTest, TestRedactionBasic) { + ASSERT_STREQ("<redacted>", KUDU_REDACT("hello")); + { + ScopedDisableRedaction no_redaction; + ASSERT_STREQ("hello", KUDU_REDACT("hello")); + } + ASSERT_STREQ("hello", KUDU_DISABLE_REDACTION(KUDU_REDACT("hello"))); +} + +// Typically, ToString() methods apply to some complex object with a bunch +// of fields, some of which are user data (need redaction) and others of which +// are not. This shows an example of a such a function, which will behave +// differently based on whether the calling scope has explicitly disabled +// redaction. +string SomeComplexStringify(const string& public_data, const string& private_data) { + return strings::Substitute("public=$0, private=$1", + public_data, + KUDU_REDACT(private_data)); +} + +TEST(LoggingTest, TestRedactionIllustrateUsage) { + // By default, the private data will be redacted. + ASSERT_EQ("public=abc, private=<redacted>", SomeComplexStringify("abc", "def")); + + // We can wrap the expression in KUDU_DISABLE_REDACTION(...) to evaluate it + // with redaction temporarily disabled. + ASSERT_EQ("public=abc, private=def", KUDU_DISABLE_REDACTION(SomeComplexStringify("abc", "def"))); + + // Or we can execute an entire scope with redaction disabled. + KUDU_DISABLE_REDACTION({ + ASSERT_EQ("public=abc, private=def", SomeComplexStringify("abc", "def")); + }); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/logging.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/logging.cc b/be/src/kudu/util/logging.cc new file mode 100644 index 0000000..c62bfe7 --- /dev/null +++ b/be/src/kudu/util/logging.cc @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "kudu/util/logging.h" + +#include <stdio.h> + +#include <algorithm> +#include <fstream> +#include <iostream> +#include <mutex> +#include <sstream> +#include <utility> +#include <vector> + +#include <boost/uuid/uuid.hpp> +#include <boost/uuid/uuid_generators.hpp> +#include <boost/uuid/uuid_io.hpp> +#include <glog/logging.h> + +#include "kudu/gutil/callback.h" +#include "kudu/gutil/spinlock.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/async_logger.h" +#include "kudu/util/debug-util.h" +#include "kudu/util/debug/leakcheck_disabler.h" +#include "kudu/util/env.h" +#include "kudu/util/env_util.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/minidump.h" +#include "kudu/util/signal.h" +#include "kudu/util/status.h" + +DEFINE_string(log_filename, "", + "Prefix of log filename - " + "full path is <log_dir>/<log_filename>.[INFO|WARN|ERROR|FATAL]"); +TAG_FLAG(log_filename, stable); + +DEFINE_bool(log_async, true, + "Enable asynchronous writing to log files. This improves " + "latency and stability."); +TAG_FLAG(log_async, hidden); + +DEFINE_int32(log_async_buffer_bytes_per_level, 2 * 1024 * 1024, + "The number of bytes of buffer space used by each log " + "level. Only relevant when --log_async is enabled."); +TAG_FLAG(log_async_buffer_bytes_per_level, hidden); + +DEFINE_int32(max_log_files, 10, + "Maximum number of log files to retain per severity level. The most recent " + "log files are retained. If set to 0, all log files are retained."); +TAG_FLAG(max_log_files, runtime); +TAG_FLAG(max_log_files, experimental); + +#define PROJ_NAME "kudu" + +bool logging_initialized = false; + +using namespace std; // NOLINT(*) +using namespace boost::uuids; // NOLINT(*) + +using base::SpinLock; +using base::SpinLockHolder; + +namespace kudu { + +__thread bool tls_redact_user_data = true; +bool g_should_redact_log; +bool g_should_redact_flag; +const char* const kRedactionMessage = "<redacted>"; + +namespace { + +class SimpleSink : public google::LogSink { + public: + explicit SimpleSink(LoggingCallback cb) : cb_(std::move(cb)) {} + + virtual ~SimpleSink() OVERRIDE { + } + + virtual void send(google::LogSeverity severity, const char* full_filename, + const char* base_filename, int line, + const struct ::tm* tm_time, + const char* message, size_t message_len) OVERRIDE { + LogSeverity kudu_severity; + switch (severity) { + case google::INFO: + kudu_severity = SEVERITY_INFO; + break; + case google::WARNING: + kudu_severity = SEVERITY_WARNING; + break; + case google::ERROR: + kudu_severity = SEVERITY_ERROR; + break; + case google::FATAL: + kudu_severity = SEVERITY_FATAL; + break; + default: + LOG(FATAL) << "Unknown glog severity: " << severity; + } + cb_.Run(kudu_severity, full_filename, line, tm_time, message, message_len); + } + + private: + + LoggingCallback cb_; +}; + +SpinLock logging_mutex(base::LINKER_INITIALIZED); + +// There can only be a single instance of a SimpleSink. +// +// Protected by 'logging_mutex'. +SimpleSink* registered_sink = nullptr; + +// Records the logging severity after the first call to +// InitGoogleLoggingSafe{Basic}. Calls to UnregisterLoggingCallback() +// will restore stderr logging back to this severity level. +// +// Protected by 'logging_mutex'. +int initial_stderr_severity; + +void EnableAsyncLogging() { + debug::ScopedLeakCheckDisabler leaky; + + // Enable Async for every level except for FATAL. Fatal should be synchronous + // to ensure that we get the fatal log message written before exiting. + for (auto level : { google::INFO, google::WARNING, google::ERROR }) { + auto* orig = google::base::GetLogger(level); + auto* async = new AsyncLogger(orig, FLAGS_log_async_buffer_bytes_per_level); + async->Start(); + google::base::SetLogger(level, async); + } +} + +void UnregisterLoggingCallbackUnlocked() { + CHECK(logging_mutex.IsHeld()); + CHECK(registered_sink); + + // Restore logging to stderr, then remove our sink. This ordering ensures + // that no log messages are missed. + google::SetStderrLogging(initial_stderr_severity); + google::RemoveLogSink(registered_sink); + delete registered_sink; + registered_sink = nullptr; +} + +void FlushCoverageOnExit() { + // Coverage flushing is not re-entrant, but this might be called from a + // crash signal context, so avoid re-entrancy. + static __thread bool in_call = false; + if (in_call) return; + in_call = true; + + // The failure writer will be called multiple times per exit. + // We only need to flush coverage once. We use a 'once' here so that, + // if another thread is already flushing, we'll block and wait for them + // to finish before allowing this thread to call abort(). + static std::once_flag once; + std::call_once(once, [] { + static const char msg[] = "Flushing coverage data before crash...\n"; + write(STDERR_FILENO, msg, arraysize(msg)); + TryFlushCoverage(); + }); + in_call = false; +} + +// On SEGVs, etc, glog will call this function to write the error to stderr. This +// implementation is copied from glog with the exception that we also flush coverage +// the first time it's called. +// +// NOTE: this is only used in coverage builds! +void FailureWriterWithCoverage(const char* data, int size) { + FlushCoverageOnExit(); + + // Original implementation from glog: + if (write(STDERR_FILENO, data, size) < 0) { + // Ignore errors. + } +} + +// GLog "failure function". This is called in the case of LOG(FATAL) to +// ensure that we flush coverage even on crashes. +// +// NOTE: this is only used in coverage builds! +void FlushCoverageAndAbort() { + FlushCoverageOnExit(); + abort(); +} +} // anonymous namespace + +void InitGoogleLoggingSafe(const char* arg) { + SpinLockHolder l(&logging_mutex); + if (logging_initialized) return; + + google::InstallFailureSignalHandler(); + + if (!FLAGS_log_filename.empty()) { + for (int severity = google::INFO; severity <= google::FATAL; ++severity) { + google::SetLogSymlink(severity, FLAGS_log_filename.c_str()); + } + } + + // This forces our logging to use /tmp rather than looking for a + // temporary directory if none is specified. This is done so that we + // can reliably construct the log file name without duplicating the + // complex logic that glog uses to guess at a temporary dir. + if (FLAGS_log_dir.empty()) { + FLAGS_log_dir = "/tmp"; + } + + if (!FLAGS_logtostderr) { + // Verify that a log file can be created in log_dir by creating a tmp file. + ostringstream ss; + random_generator uuid_generator; + ss << FLAGS_log_dir << "/" << PROJ_NAME "_test_log." << uuid_generator(); + const string file_name = ss.str(); + ofstream test_file(file_name.c_str()); + if (!test_file.is_open()) { + ostringstream error_msg; + error_msg << "Could not open file in log_dir " << FLAGS_log_dir; + perror(error_msg.str().c_str()); + // Unlock the mutex before exiting the program to avoid mutex d'tor assert. + logging_mutex.Unlock(); + exit(1); + } + remove(file_name.c_str()); + } + + google::InitGoogleLogging(arg); + + // In coverage builds, we should flush coverage before exiting on crash. + // This way, fault injection tests still capture coverage of the daemon + // that "crashed". + if (IsCoverageBuild()) { + // We have to use both the "failure writer" and the "FailureFunction". + // This allows us to handle both LOG(FATAL) and unintended crashes like + // SEGVs. + google::InstallFailureWriter(FailureWriterWithCoverage); + google::InstallFailureFunction(FlushCoverageAndAbort); + } + + // Needs to be done after InitGoogleLogging + if (FLAGS_log_filename.empty()) { + CHECK_STRNE(google::ProgramInvocationShortName(), "UNKNOWN") + << ": must initialize gflags before glog"; + FLAGS_log_filename = google::ProgramInvocationShortName(); + } + + // File logging: on. + // Stderr logging threshold: FLAGS_stderrthreshold. + // Sink logging: off. + initial_stderr_severity = FLAGS_stderrthreshold; + + // Ignore SIGPIPE early in the startup process so that threads writing to TLS + // sockets do not crash when writing to a closed socket. See KUDU-1910. + IgnoreSigPipe(); + + // For minidump support. Must be called before logging threads started. + CHECK_OK(BlockSigUSR1()); + + if (FLAGS_log_async) { + EnableAsyncLogging(); + } + + logging_initialized = true; +} + +void InitGoogleLoggingSafeBasic(const char* arg) { + SpinLockHolder l(&logging_mutex); + if (logging_initialized) return; + + google::InitGoogleLogging(arg); + + // This also disables file-based logging. + google::LogToStderr(); + + // File logging: off. + // Stderr logging threshold: INFO. + // Sink logging: off. + initial_stderr_severity = google::INFO; + logging_initialized = true; +} + +void RegisterLoggingCallback(const LoggingCallback& cb) { + SpinLockHolder l(&logging_mutex); + CHECK(logging_initialized); + + if (registered_sink) { + LOG(WARNING) << "Cannot register logging callback: one already registered"; + return; + } + + // AddLogSink() claims to take ownership of the sink, but it doesn't + // really; it actually expects it to remain valid until + // google::ShutdownGoogleLogging() is called. + registered_sink = new SimpleSink(cb); + google::AddLogSink(registered_sink); + + // Even when stderr logging is ostensibly off, it's still emitting + // ERROR-level stuff. This is the default. + google::SetStderrLogging(google::ERROR); + + // File logging: yes, if InitGoogleLoggingSafe() was called earlier. + // Stderr logging threshold: ERROR. + // Sink logging: on. +} + +void UnregisterLoggingCallback() { + SpinLockHolder l(&logging_mutex); + CHECK(logging_initialized); + + if (!registered_sink) { + LOG(WARNING) << "Cannot unregister logging callback: none registered"; + return; + } + + UnregisterLoggingCallbackUnlocked(); + // File logging: yes, if InitGoogleLoggingSafe() was called earlier. + // Stderr logging threshold: initial_stderr_severity. + // Sink logging: off. +} + +void GetFullLogFilename(google::LogSeverity severity, string* filename) { + ostringstream ss; + ss << FLAGS_log_dir << "/" << FLAGS_log_filename << "." + << google::GetLogSeverityName(severity); + *filename = ss.str(); +} + +void ShutdownLoggingSafe() { + SpinLockHolder l(&logging_mutex); + if (!logging_initialized) return; + + if (registered_sink) { + UnregisterLoggingCallbackUnlocked(); + } + + google::ShutdownGoogleLogging(); + + logging_initialized = false; +} + +Status DeleteExcessLogFiles(Env* env) { + int32_t max_log_files = FLAGS_max_log_files; + // Ignore bad input or disable log rotation. + if (max_log_files <= 0) return Status::OK(); + + for (int severity = 0; severity < google::NUM_SEVERITIES; ++severity) { + // Build glob pattern for input + // e.g. /var/log/kudu/kudu-master.*.INFO.* + string pattern = strings::Substitute("$0/$1.*.$2.*", FLAGS_log_dir, FLAGS_log_filename, + google::GetLogSeverityName(severity)); + + // Keep the 'max_log_files' most recent log files, as compared by + // modification time. Glog files contain a second-granularity timestamp in + // the name, so this could potentially use the filename sort order as + // guaranteed by glob, however this code has been adapted from Impala which + // uses mtime to determine which files to delete, and there haven't been any + // issues in production settings. + RETURN_NOT_OK(env_util::DeleteExcessFilesByPattern(env, pattern, max_log_files)); + } + return Status::OK(); +} + +// Support for the special THROTTLE_MSG token in a log message stream. +ostream& operator<<(ostream &os, const PRIVATE_ThrottleMsg& /*unused*/) { + using google::LogMessage; +#ifdef DISABLE_RTTI + LogMessage::LogStream *log = static_cast<LogMessage::LogStream*>(&os); +#else + LogMessage::LogStream *log = dynamic_cast<LogMessage::LogStream*>(&os); +#endif + CHECK(log && log == log->self()) + << "You must not use COUNTER with non-glog ostream"; + int ctr = log->ctr(); + if (ctr > 0) { + os << " [suppressed " << ctr << " similar messages]"; + } + return os; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/logging.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/logging.h b/be/src/kudu/util/logging.h new file mode 100644 index 0000000..682431e --- /dev/null +++ b/be/src/kudu/util/logging.h @@ -0,0 +1,359 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_LOGGING_H +#define KUDU_UTIL_LOGGING_H + +#include <string> +#include <gflags/gflags_declare.h> +#include <glog/logging.h> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/dynamic_annotations.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/flags.h" +#include "kudu/util/logging_callback.h" +#include "kudu/util/status.h" + +//////////////////////////////////////////////////////////////////////////////// +// Redaction support +//////////////////////////////////////////////////////////////////////////////// + +// Disable redaction of user data while evaluating the expression 'expr'. +// This may be used inline as an expression, such as: +// +// LOG(INFO) << KUDU_DISABLE_REDACTION(schema.DebugRow(my_row)); +// +// or with a block: +// +// KUDU_DISABLE_REDACTION({ +// LOG(INFO) << schema.DebugRow(my_row); +// }); +// +// Redaction should be disabled in the following cases: +// +// 1) Outputting strings to a "secure" endpoint (for example an authenticated and authorized +// web UI) +// +// 2) Using methods like schema.DebugRow(...) when the parameter is not in fact a user-provided +// row, but instead some piece of metadata such as a partition boundary. +#define KUDU_DISABLE_REDACTION(expr) ([&]() { \ + kudu::ScopedDisableRedaction s; \ + return (expr); \ + })() + +// Evaluates to 'true' if the caller should redact any user data in the current scope. +// Most callers should instead use KUDU_REDACT(...) defined below, but this can be useful +// to short-circuit expensive logic. +#define KUDU_SHOULD_REDACT() (kudu::g_should_redact_log && kudu::tls_redact_user_data) + +// Either evaluate and return 'expr', or return the string "<redacted>", depending on whether +// redaction is enabled in the current scope. +#define KUDU_REDACT(expr) \ + (KUDU_SHOULD_REDACT() ? kRedactionMessage : (expr)) + +// Like the above, but with the additional condition that redaction will only +// be performed if 'cond' must be true. +#define KUDU_MAYBE_REDACT_IF(cond, expr) \ + ((KUDU_SHOULD_REDACT() && (cond)) ? kudu::kRedactionMessage : (expr)) + +//////////////////////////////////////// +// Redaction implementation details follow. +//////////////////////////////////////// + +namespace kudu { + +// Flag which allows redaction to be enabled or disabled for a thread context. +// Defaults to enabling redaction, since it's the safer default with respect to +// leaking user data, and it's easier to identify when data is over-redacted +// than vice-versa. +extern __thread bool tls_redact_user_data; + +// Redacted log messages are replaced with this constant. +extern const char* const kRedactionMessage; + +// Flag for checking if log redaction is enabled or disabled. +extern bool g_should_redact_log; + +// Flag for checking if flag redaction is enabled or disabled. +extern bool g_should_redact_flag; + +class ScopedDisableRedaction { + public: + ScopedDisableRedaction() + : old_val_(tls_redact_user_data) { + tls_redact_user_data = false; + } + + ~ScopedDisableRedaction() { + tls_redact_user_data = old_val_; + } + private: + bool old_val_; +}; + +} // namespace kudu + +//////////////////////////////////////////////////////////////////////////////// +// Throttled logging support +//////////////////////////////////////////////////////////////////////////////// + +// Logs a message throttled to appear at most once every 'n_secs' seconds to +// the given severity. +// +// The log message may include the special token 'THROTTLE_MSG' which expands +// to either an empty string or '[suppressed <n> similar messages]'. +// +// Example usage: +// KLOG_EVERY_N_SECS(WARNING, 1) << "server is low on memory" << THROTTLE_MSG; +// +// +// Advanced per-instance throttling +// ----------------------------------- +// For cases where the throttling should be scoped to a given class instance, +// you may define a logging::LogThrottler object and pass it to the +// KLOG_EVERY_N_SECS_THROTTLER(...) macro. In addition, you must pass a "tag". +// Only log messages with equal tags (by pointer equality) will be throttled. +// For example: +// +// struct MyThing { +// string name; +// LogThrottler throttler; +// }; +// +// if (...) { +// LOG_EVERY_N_SECS_THROTTLER(INFO, 1, my_thing->throttler, "coffee") << +// my_thing->name << " needs coffee!"; +// } else { +// LOG_EVERY_N_SECS_THROTTLER(INFO, 1, my_thing->throttler, "wine") << +// my_thing->name << " needs wine!"; +// } +// +// In this example, the "coffee"-related message will be collapsed into other +// such messages within the prior one second; however, if the state alternates +// between the "coffee" message and the "wine" message, then each such alternation +// will yield a message. + +#define KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, throttler, tag) \ + int VARNAME_LINENUM(num_suppressed) = 0; \ + if (throttler.ShouldLog(n_secs, tag, &VARNAME_LINENUM(num_suppressed))) \ + google::LogMessage( \ + __FILE__, __LINE__, google::GLOG_ ## severity, VARNAME_LINENUM(num_suppressed), \ + &google::LogMessage::SendToLog).stream() + +#define KLOG_EVERY_N_SECS(severity, n_secs) \ + static logging::LogThrottler LOG_THROTTLER; \ + KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, LOG_THROTTLER, "no-tag") + + +namespace kudu { +enum PRIVATE_ThrottleMsg {THROTTLE_MSG}; +} // namespace kudu + +//////////////////////////////////////////////////////////////////////////////// +// Versions of glog macros for "LOG_EVERY" and "LOG_FIRST" that annotate the +// benign races on their internal static variables. +//////////////////////////////////////////////////////////////////////////////// + +// The "base" macros. +#define KUDU_SOME_KIND_OF_LOG_EVERY_N(severity, n, what_to_do) \ + static int LOG_OCCURRENCES = 0, LOG_OCCURRENCES_MOD_N = 0; \ + ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging every N is approximate"); \ + ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES_MOD_N, "Logging every N is approximate"); \ + ++LOG_OCCURRENCES; \ + if (++LOG_OCCURRENCES_MOD_N > n) LOG_OCCURRENCES_MOD_N -= n; \ + if (LOG_OCCURRENCES_MOD_N == 1) \ + google::LogMessage( \ + __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \ + &what_to_do).stream() + +#define KUDU_SOME_KIND_OF_LOG_IF_EVERY_N(severity, condition, n, what_to_do) \ + static int LOG_OCCURRENCES = 0, LOG_OCCURRENCES_MOD_N = 0; \ + ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging every N is approximate"); \ + ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES_MOD_N, "Logging every N is approximate"); \ + ++LOG_OCCURRENCES; \ + if (condition && \ + ((LOG_OCCURRENCES_MOD_N=(LOG_OCCURRENCES_MOD_N + 1) % n) == (1 % n))) \ + google::LogMessage( \ + __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \ + &what_to_do).stream() + +#define KUDU_SOME_KIND_OF_PLOG_EVERY_N(severity, n, what_to_do) \ + static int LOG_OCCURRENCES = 0, LOG_OCCURRENCES_MOD_N = 0; \ + ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging every N is approximate"); \ + ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES_MOD_N, "Logging every N is approximate"); \ + ++LOG_OCCURRENCES; \ + if (++LOG_OCCURRENCES_MOD_N > n) LOG_OCCURRENCES_MOD_N -= n; \ + if (LOG_OCCURRENCES_MOD_N == 1) \ + google::ErrnoLogMessage( \ + __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \ + &what_to_do).stream() + +#define KUDU_SOME_KIND_OF_LOG_FIRST_N(severity, n, what_to_do) \ + static uint64_t LOG_OCCURRENCES = 0; \ + ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging the first N is approximate"); \ + if (LOG_OCCURRENCES++ < n) \ + google::LogMessage( \ + __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \ + &what_to_do).stream() + +// The direct user-facing macros. +#define KLOG_EVERY_N(severity, n) \ + GOOGLE_GLOG_COMPILE_ASSERT(google::GLOG_ ## severity < \ + google::NUM_SEVERITIES, \ + INVALID_REQUESTED_LOG_SEVERITY); \ + KUDU_SOME_KIND_OF_LOG_EVERY_N(severity, (n), google::LogMessage::SendToLog) + +#define KSYSLOG_EVERY_N(severity, n) \ + KUDU_SOME_KIND_OF_LOG_EVERY_N(severity, (n), google::LogMessage::SendToSyslogAndLog) + +#define KPLOG_EVERY_N(severity, n) \ + KUDU_SOME_KIND_OF_PLOG_EVERY_N(severity, (n), google::LogMessage::SendToLog) + +#define KLOG_FIRST_N(severity, n) \ + KUDU_SOME_KIND_OF_LOG_FIRST_N(severity, (n), google::LogMessage::SendToLog) + +#define KLOG_IF_EVERY_N(severity, condition, n) \ + KUDU_SOME_KIND_OF_LOG_IF_EVERY_N(severity, (condition), (n), google::LogMessage::SendToLog) + +// We also disable the un-annotated glog macros for anyone who includes this header. +#undef LOG_EVERY_N +#define LOG_EVERY_N(severity, n) \ + GOOGLE_GLOG_COMPILE_ASSERT(false, "LOG_EVERY_N is deprecated. Please use KLOG_EVERY_N.") + +#undef SYSLOG_EVERY_N +#define SYSLOG_EVERY_N(severity, n) \ + GOOGLE_GLOG_COMPILE_ASSERT(false, "SYSLOG_EVERY_N is deprecated. Please use KSYSLOG_EVERY_N.") + +#undef PLOG_EVERY_N +#define PLOG_EVERY_N(severity, n) \ + GOOGLE_GLOG_COMPILE_ASSERT(false, "PLOG_EVERY_N is deprecated. Please use KPLOG_EVERY_N.") + +#undef LOG_FIRST_N +#define LOG_FIRST_N(severity, n) \ + GOOGLE_GLOG_COMPILE_ASSERT(false, "LOG_FIRST_N is deprecated. Please use KLOG_FIRST_N.") + +#undef LOG_IF_EVERY_N +#define LOG_IF_EVERY_N(severity, condition, n) \ + GOOGLE_GLOG_COMPILE_ASSERT(false, "LOG_IF_EVERY_N is deprecated. Please use KLOG_IF_EVERY_N.") + +namespace kudu { + +class Env; + +// glog doesn't allow multiple invocations of InitGoogleLogging. This method conditionally +// calls InitGoogleLogging only if it hasn't been called before. +// +// It also takes care of installing the google failure signal handler and +// setting the signal handler for SIGPIPE to SIG_IGN. +void InitGoogleLoggingSafe(const char* arg); + +// Like InitGoogleLoggingSafe() but stripped down: no signal handlers are +// installed, regular logging is disabled, and log events of any severity +// will be written to stderr. +// +// These properties make it attractive for us in libraries. +void InitGoogleLoggingSafeBasic(const char* arg); + +// Demotes stderr logging to ERROR or higher and registers 'cb' as the +// recipient for all log events. +// +// Subsequent calls to RegisterLoggingCallback no-op (until the callback +// is unregistered with UnregisterLoggingCallback()). +void RegisterLoggingCallback(const LoggingCallback& cb); + +// Unregisters a callback previously registered with +// RegisterLoggingCallback() and promotes stderr logging back to all +// severities. +// +// If no callback is registered, this is a no-op. +void UnregisterLoggingCallback(); + +// Returns the full pathname of the symlink to the most recent log +// file corresponding to this severity +void GetFullLogFilename(google::LogSeverity severity, std::string* filename); + +// Shuts down the google logging library. Call before exit to ensure that log files are +// flushed. +void ShutdownLoggingSafe(); + +// Deletes excess rotated log files. +// +// Keeps at most 'FLAG_max_log_files' of the most recent log files at every +// severity level, using the file's modified time to determine recency. +Status DeleteExcessLogFiles(Env* env); + +namespace logging { + +// A LogThrottler instance tracks the throttling state for a particular +// log message. +// +// This is used internally by KLOG_EVERY_N_SECS, but can also be used +// explicitly in conjunction with KLOG_EVERY_N_SECS_THROTTLER. See the +// macro descriptions above for details. +class LogThrottler { + public: + LogThrottler() : num_suppressed_(0), last_ts_(0), last_tag_(nullptr) { + ANNOTATE_BENIGN_RACE_SIZED(this, sizeof(*this), "OK to be sloppy with log throttling"); + } + + bool ShouldLog(int n_secs, const char* tag, int* num_suppressed) { + MicrosecondsInt64 ts = GetMonoTimeMicros(); + + // When we switch tags, we should not show the "suppressed" messages, because + // in fact it's a different message that we skipped. So, reset it to zero, + // and always log the new message. + if (tag != last_tag_) { + *num_suppressed = num_suppressed_ = 0; + last_tag_ = tag; + last_ts_ = ts; + return true; + } + + if (ts - last_ts_ < n_secs * 1e6) { + *num_suppressed = base::subtle::NoBarrier_AtomicIncrement(&num_suppressed_, 1); + return false; + } + last_ts_ = ts; + *num_suppressed = base::subtle::NoBarrier_AtomicExchange(&num_suppressed_, 0); + return true; + } + private: + Atomic32 num_suppressed_; + uint64_t last_ts_; + const char* last_tag_; +}; +} // namespace logging + +std::ostream& operator<<(std::ostream &os, const PRIVATE_ThrottleMsg&); + +// Convenience macros to prefix log messages with some prefix, these are the unlocked +// versions and should not obtain a lock (if one is required to obtain the prefix). +// There must be a LogPrefixUnlocked()/LogPrefixLocked() method available in the current +// scope in order to use these macros. +#define LOG_WITH_PREFIX_UNLOCKED(severity) LOG(severity) << LogPrefixUnlocked() +#define VLOG_WITH_PREFIX_UNLOCKED(verboselevel) LOG_IF(INFO, VLOG_IS_ON(verboselevel)) \ + << LogPrefixUnlocked() + +// Same as the above, but obtain the lock. +#define LOG_WITH_PREFIX(severity) LOG(severity) << LogPrefix() +#define VLOG_WITH_PREFIX(verboselevel) LOG_IF(INFO, VLOG_IS_ON(verboselevel)) \ + << LogPrefix() + +} // namespace kudu + +#endif // KUDU_UTIL_LOGGING_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/logging_callback.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/logging_callback.h b/be/src/kudu/util/logging_callback.h new file mode 100644 index 0000000..83fb973 --- /dev/null +++ b/be/src/kudu/util/logging_callback.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_LOGGING_CALLBACK_H +#define KUDU_UTIL_LOGGING_CALLBACK_H + +#include <ctime> +#include <string> + +#include "kudu/gutil/callback_forward.h" + +namespace kudu { + +enum LogSeverity { + SEVERITY_INFO, + SEVERITY_WARNING, + SEVERITY_ERROR, + SEVERITY_FATAL +}; + +// Callback for simple logging. +// +// 'message' is NOT terminated with an endline. +typedef Callback<void(LogSeverity severity, + const char* filename, + int line_number, + const struct ::tm* time, + const char* message, + size_t message_len)> LoggingCallback; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/logging_test_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/logging_test_util.h b/be/src/kudu/util/logging_test_util.h new file mode 100644 index 0000000..8102375 --- /dev/null +++ b/be/src/kudu/util/logging_test_util.h @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_LOGGING_TEST_UTIL_H +#define KUDU_LOGGING_TEST_UTIL_H + +#include <glog/logging.h> +#include <string> +#include <vector> + +namespace kudu { + +// GLog sink that keeps an internal buffer of messages that have been logged. +class StringVectorSink : public google::LogSink { + public: + void send(google::LogSeverity severity, const char* full_filename, + const char* base_filename, int line, + const struct ::tm* tm_time, + const char* message, size_t message_len) override { + logged_msgs_.push_back(ToString(severity, base_filename, line, + tm_time, message, message_len)); + } + + std::vector<std::string>& logged_msgs() { + return logged_msgs_; + } + + private: + std::vector<std::string> logged_msgs_; +}; + +// RAII wrapper around registering a LogSink with GLog. +struct ScopedRegisterSink { + explicit ScopedRegisterSink(google::LogSink* s) : s_(s) { + google::AddLogSink(s_); + } + ~ScopedRegisterSink() { + google::RemoveLogSink(s_); + } + + google::LogSink* s_; +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/maintenance_manager-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/maintenance_manager-test.cc b/be/src/kudu/util/maintenance_manager-test.cc new file mode 100644 index 0000000..a16213e --- /dev/null +++ b/be/src/kudu/util/maintenance_manager-test.cc @@ -0,0 +1,321 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <atomic> +#include <memory> +#include <mutex> +#include <vector> + +#include <gflags/gflags.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/maintenance_manager.h" +#include "kudu/util/mem_tracker.h" +#include "kudu/util/metrics.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" +#include "kudu/util/thread.h" + +using std::shared_ptr; +using std::vector; +using strings::Substitute; + +METRIC_DEFINE_entity(test); +METRIC_DEFINE_gauge_uint32(test, maintenance_ops_running, + "Number of Maintenance Operations Running", + kudu::MetricUnit::kMaintenanceOperations, + "The number of background maintenance operations currently running."); +METRIC_DEFINE_histogram(test, maintenance_op_duration, + "Maintenance Operation Duration", + kudu::MetricUnit::kSeconds, "", 60000000LU, 2); + +DECLARE_int64(log_target_replay_size_mb); + +namespace kudu { + +static const int kHistorySize = 4; +static const char kFakeUuid[] = "12345"; + +class MaintenanceManagerTest : public KuduTest { + public: + void SetUp() override { + MaintenanceManager::Options options; + options.num_threads = 2; + options.polling_interval_ms = 1; + options.history_size = kHistorySize; + manager_.reset(new MaintenanceManager(options)); + manager_->set_memory_pressure_func_for_tests( + [&](double* consumption) { + return indicate_memory_pressure_.load(); + }); + ASSERT_OK(manager_->Init(kFakeUuid)); + } + + void TearDown() override { + manager_->Shutdown(); + } + + protected: + shared_ptr<MaintenanceManager> manager_; + std::atomic<bool> indicate_memory_pressure_ { false }; +}; + +// Just create the MaintenanceManager and then shut it down, to make sure +// there are no race conditions there. +TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) { +} + +class TestMaintenanceOp : public MaintenanceOp { + public: + TestMaintenanceOp(const std::string& name, + IOUsage io_usage) + : MaintenanceOp(name, io_usage), + ram_anchored_(500), + logs_retained_bytes_(0), + perf_improvement_(0), + metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")), + maintenance_op_duration_(METRIC_maintenance_op_duration.Instantiate(metric_entity_)), + maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)), + remaining_runs_(1), + prepared_runs_(0), + sleep_time_(MonoDelta::FromSeconds(0)) { + } + + virtual ~TestMaintenanceOp() {} + + virtual bool Prepare() OVERRIDE { + std::lock_guard<Mutex> guard(lock_); + if (remaining_runs_ == 0) { + return false; + } + remaining_runs_--; + prepared_runs_++; + DLOG(INFO) << "Prepared op " << name(); + return true; + } + + virtual void Perform() OVERRIDE { + { + std::lock_guard<Mutex> guard(lock_); + DLOG(INFO) << "Performing op " << name(); + + // Ensure that we don't call Perform() more times than we returned + // success from Prepare(). + CHECK_GE(prepared_runs_, 1); + prepared_runs_--; + } + + SleepFor(sleep_time_); + } + + virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE { + std::lock_guard<Mutex> guard(lock_); + stats->set_runnable(remaining_runs_ > 0); + stats->set_ram_anchored(ram_anchored_); + stats->set_logs_retained_bytes(logs_retained_bytes_); + stats->set_perf_improvement(perf_improvement_); + } + + void set_remaining_runs(int runs) { + std::lock_guard<Mutex> guard(lock_); + remaining_runs_ = runs; + } + + void set_sleep_time(MonoDelta time) { + std::lock_guard<Mutex> guard(lock_); + sleep_time_ = time; + } + + void set_ram_anchored(uint64_t ram_anchored) { + std::lock_guard<Mutex> guard(lock_); + ram_anchored_ = ram_anchored; + } + + void set_logs_retained_bytes(uint64_t logs_retained_bytes) { + std::lock_guard<Mutex> guard(lock_); + logs_retained_bytes_ = logs_retained_bytes; + } + + void set_perf_improvement(uint64_t perf_improvement) { + std::lock_guard<Mutex> guard(lock_); + perf_improvement_ = perf_improvement; + } + + virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE { + return maintenance_op_duration_; + } + + virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE { + return maintenance_ops_running_; + } + + private: + Mutex lock_; + + uint64_t ram_anchored_; + uint64_t logs_retained_bytes_; + uint64_t perf_improvement_; + MetricRegistry metric_registry_; + scoped_refptr<MetricEntity> metric_entity_; + scoped_refptr<Histogram> maintenance_op_duration_; + scoped_refptr<AtomicGauge<uint32_t> > maintenance_ops_running_; + + // The number of remaining times this operation will run before disabling + // itself. + int remaining_runs_; + // The number of Prepared() operations which have not yet been Perform()ed. + int prepared_runs_; + + // The amount of time each op invocation will sleep. + MonoDelta sleep_time_; +}; + +// Create an op and wait for it to start running. Unregister it while it is +// running and verify that UnregisterOp waits for it to finish before +// proceeding. +TEST_F(MaintenanceManagerTest, TestRegisterUnregister) { + TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE); + op1.set_perf_improvement(10); + // Register initially with no remaining runs. We'll later enable it once it's + // already registered. + op1.set_remaining_runs(0); + manager_->RegisterOp(&op1); + scoped_refptr<kudu::Thread> thread; + CHECK_OK(Thread::Create( + "TestThread", "TestRegisterUnregister", + boost::bind(&TestMaintenanceOp::set_remaining_runs, &op1, 1), &thread)); + ASSERT_EVENTUALLY([&]() { + ASSERT_EQ(op1.DurationHistogram()->TotalCount(), 1); + }); + manager_->UnregisterOp(&op1); + ThreadJoiner(thread.get()).Join(); +} + +// Regression test for KUDU-1495: when an operation is being unregistered, +// new instances of that operation should not be scheduled. +TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) { + TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE); + op1.set_perf_improvement(10); + + // Set the op to run up to 10 times, and each time should sleep for a second. + op1.set_remaining_runs(10); + op1.set_sleep_time(MonoDelta::FromSeconds(1)); + manager_->RegisterOp(&op1); + + // Wait until two instances of the ops start running, since we have two MM threads. + ASSERT_EVENTUALLY([&]() { + ASSERT_EQ(op1.RunningGauge()->value(), 2); + }); + + // Trigger Unregister while they are running. This should wait for the currently- + // running operations to complete, but no new operations should be scheduled. + manager_->UnregisterOp(&op1); + + // Hence, we should have run only the original two that we saw above. + ASSERT_LE(op1.DurationHistogram()->TotalCount(), 2); +} + +// Test that we'll run an operation that doesn't improve performance when memory +// pressure gets high. +TEST_F(MaintenanceManagerTest, TestMemoryPressure) { + TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE); + op.set_ram_anchored(100); + manager_->RegisterOp(&op); + + // At first, we don't want to run this, since there is no perf_improvement. + SleepFor(MonoDelta::FromMilliseconds(20)); + ASSERT_EQ(0, op.DurationHistogram()->TotalCount()); + + // Fake that the server is under memory pressure. + indicate_memory_pressure_ = true; + + ASSERT_EVENTUALLY([&]() { + ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1); + }); + manager_->UnregisterOp(&op); +} + +// Test that ops are prioritized correctly when we add log retention. +TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) { + const int64_t kMB = 1024 * 1024; + + manager_->Shutdown(); + + TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE); + op1.set_ram_anchored(0); + op1.set_logs_retained_bytes(100 * kMB); + + TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE); + op2.set_ram_anchored(100); + op2.set_logs_retained_bytes(100 * kMB); + + TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE); + op3.set_ram_anchored(200); + op3.set_logs_retained_bytes(100 * kMB); + + manager_->RegisterOp(&op1); + manager_->RegisterOp(&op2); + manager_->RegisterOp(&op3); + + // We want to do the low IO op first since it clears up some log retention. + ASSERT_EQ(&op1, manager_->FindBestOp()); + + manager_->UnregisterOp(&op1); + + // Low IO is taken care of, now we find the op that clears the most log retention and ram. + // However, with the default settings, we won't bother running any of these operations + // which only retain 100MB of logs. + ASSERT_EQ(nullptr, manager_->FindBestOp()); + + // If we change the target WAL size, we will select these ops. + FLAGS_log_target_replay_size_mb = 50; + ASSERT_EQ(&op3, manager_->FindBestOp()); + + manager_->UnregisterOp(&op3); + + ASSERT_EQ(&op2, manager_->FindBestOp()); + + manager_->UnregisterOp(&op2); +} + +// Test adding operations and make sure that the history of recently completed operations +// is correct in that it wraps around and doesn't grow. +TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) { + for (int i = 0; i < 5; i++) { + string name = Substitute("op$0", i); + TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE); + op.set_perf_improvement(1); + op.set_ram_anchored(100); + manager_->RegisterOp(&op); + + ASSERT_EVENTUALLY([&]() { + ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1); + }); + manager_->UnregisterOp(&op); + + MaintenanceManagerStatusPB status_pb; + manager_->GetMaintenanceManagerStatusDump(&status_pb); + // The size should be at most the history_size. + ASSERT_GE(kHistorySize, status_pb.completed_operations_size()); + // The most recently completed op should always be first, even if we wrap + // around. + ASSERT_EQ(name, status_pb.completed_operations(0).name()); + } +} + +} // namespace kudu
