http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadpool.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadpool.h b/be/src/kudu/util/threadpool.h new file mode 100644 index 0000000..7ac6d17 --- /dev/null +++ b/be/src/kudu/util/threadpool.h @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_THREAD_POOL_H +#define KUDU_UTIL_THREAD_POOL_H + +#include <boost/function.hpp> +#include <gtest/gtest_prod.h> +#include <list> +#include <memory> +#include <unordered_set> +#include <string> +#include <vector> + +#include "kudu/gutil/callback_forward.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" +#include "kudu/util/status.h" + +namespace kudu { + +class Histogram; +class Thread; +class ThreadPool; +class Trace; + +class Runnable { + public: + virtual void Run() = 0; + virtual ~Runnable() {} +}; + +// ThreadPool takes a lot of arguments. We provide sane defaults with a builder. +// +// name: Used for debugging output and default names of the worker threads. +// Since thread names are limited to 16 characters on Linux, it's good to +// choose a short name here. +// Required. +// +// trace_metric_prefix: used to prefix the names of TraceMetric counters. +// When a task on a thread pool has an associated trace, the thread pool +// implementation will increment TraceMetric counters to indicate the +// amount of time spent waiting in the queue as well as the amount of wall +// and CPU time spent executing. By default, these counters are prefixed +// with the name of the thread pool. For example, if the pool is named +// 'apply', then counters such as 'apply.queue_time_us' will be +// incremented. +// +// The TraceMetrics implementation relies on the number of distinct counter +// names being small. Thus, if the thread pool name itself is dynamically +// generated, the default behavior described above would result in an +// unbounded number of distinct cunter names. The 'trace_metric_prefix' +// setting can be used to override the prefix used in generating the trace +// metric names. +// +// For example, the Raft thread pools are named "<tablet id>-raft" which +// has unbounded cardinality (a server may have thousands of different +// tablet IDs over its lifetime). In that case, setting the prefix to +// "raft" will avoid any issues. +// +// min_threads: Minimum number of threads we'll have at any time. +// Default: 0. +// +// max_threads: Maximum number of threads we'll have at any time. +// Default: Number of CPUs detected on the system. +// +// max_queue_size: Maximum number of items to enqueue before returning a +// Status::ServiceUnavailable message from Submit(). +// Default: INT_MAX. +// +// idle_timeout: How long we'll keep around an idle thread before timing it out. +// We always keep at least min_threads. +// Default: 500 milliseconds. +// +class ThreadPoolBuilder { + public: + explicit ThreadPoolBuilder(std::string name); + + // Note: We violate the style guide by returning mutable references here + // in order to provide traditional Builder pattern conveniences. + ThreadPoolBuilder& set_trace_metric_prefix(const std::string& prefix); + ThreadPoolBuilder& set_min_threads(int min_threads); + ThreadPoolBuilder& set_max_threads(int max_threads); + ThreadPoolBuilder& set_max_queue_size(int max_queue_size); + ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout); + + const std::string& name() const { return name_; } + int min_threads() const { return min_threads_; } + int max_threads() const { return max_threads_; } + int max_queue_size() const { return max_queue_size_; } + const MonoDelta& idle_timeout() const { return idle_timeout_; } + + // Instantiate a new ThreadPool with the existing builder arguments. + Status Build(gscoped_ptr<ThreadPool>* pool) const; + + private: + friend class ThreadPool; + const std::string name_; + std::string trace_metric_prefix_; + int min_threads_; + int max_threads_; + int max_queue_size_; + MonoDelta idle_timeout_; + + DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder); +}; + +// Thread pool with a variable number of threads. +// The pool can execute a class that implements the Runnable interface, or a +// boost::function, which can be obtained via boost::bind(). +// +// Usage Example: +// static void Func(int n) { ... } +// class Task : public Runnable { ... } +// +// gscoped_ptr<ThreadPool> thread_pool; +// CHECK_OK( +// ThreadPoolBuilder("my_pool") +// .set_min_threads(0) +// .set_max_threads(5) +// .set_max_queue_size(10) +// .set_idle_timeout(MonoDelta::FromMilliseconds(2000)) +// .Build(&thread_pool)); +// thread_pool->Submit(shared_ptr<Runnable>(new Task())); +// thread_pool->Submit(boost::bind(&Func, 10)); +class ThreadPool { + public: + ~ThreadPool(); + + // Wait for the running tasks to complete and then shutdown the threads. + // All the other pending tasks in the queue will be removed. + // NOTE: That the user may implement an external abort logic for the + // runnables, that must be called before Shutdown(), if the system + // should know about the non-execution of these tasks, or the runnable + // require an explicit "abort" notification to exit from the run loop. + void Shutdown(); + + // Submit a function using the kudu Closure system. + Status SubmitClosure(const Closure& task) WARN_UNUSED_RESULT; + + // Submit a function binded using boost::bind(&FuncName, args...) + Status SubmitFunc(boost::function<void()> func) WARN_UNUSED_RESULT; + + // Submit a Runnable class + Status Submit(std::shared_ptr<Runnable> task) WARN_UNUSED_RESULT; + + // Wait until all the tasks are completed. + void Wait(); + + // Waits for the pool to reach the idle state, or until 'until' time is reached. + // Returns true if the pool reached the idle state, false otherwise. + bool WaitUntil(const MonoTime& until); + + // Waits for the pool to reach the idle state, or until 'delta' time elapses. + // Returns true if the pool reached the idle state, false otherwise. + bool WaitFor(const MonoDelta& delta); + + // Return the current number of tasks waiting in the queue. + // Typically used for metrics. + int queue_length() const { + return ANNOTATE_UNPROTECTED_READ(queue_size_); + } + + // Attach a histogram which measures the queue length seen by tasks when they enter + // the thread pool's queue. + void SetQueueLengthHistogram(const scoped_refptr<Histogram>& hist); + + // Attach a histogram which measures the amount of time that tasks spend waiting in + // the queue. + void SetQueueTimeMicrosHistogram(const scoped_refptr<Histogram>& hist); + + // Attach a histogram which measures the amount of time that tasks spend running. + void SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist); + + private: + friend class ThreadPoolBuilder; + + // Create a new thread pool using a builder. + explicit ThreadPool(const ThreadPoolBuilder& builder); + + // Initialize the thread pool by starting the minimum number of threads. + Status Init(); + + // Dispatcher responsible for dequeueing and executing the tasks + void DispatchThread(bool permanent); + + // Create new thread. Required that lock_ is held. + Status CreateThreadUnlocked(); + + // Aborts if the current thread is a member of this thread pool. + void CheckNotPoolThreadUnlocked(); + + private: + FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum); + FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool); + + struct QueueEntry { + std::shared_ptr<Runnable> runnable; + Trace* trace; + + // Time at which the entry was submitted to the pool. + MonoTime submit_time; + }; + + const std::string name_; + const int min_threads_; + const int max_threads_; + const int max_queue_size_; + const MonoDelta idle_timeout_; + + Status pool_status_; + Mutex lock_; + ConditionVariable idle_cond_; + ConditionVariable no_threads_cond_; + ConditionVariable not_empty_; + int num_threads_; + int active_threads_; + int queue_size_; + std::list<QueueEntry> queue_; + + // Pointers to all running threads. Raw pointers are safe because a Thread + // may only go out of scope after being removed from threads_. + // + // Protected by lock_. + std::unordered_set<Thread*> threads_; + + scoped_refptr<Histogram> queue_length_histogram_; + scoped_refptr<Histogram> queue_time_us_histogram_; + scoped_refptr<Histogram> run_time_us_histogram_; + + const char* queue_time_trace_metric_name_; + const char* run_wall_time_trace_metric_name_; + const char* run_cpu_time_trace_metric_name_; + + DISALLOW_COPY_AND_ASSIGN(ThreadPool); +}; + +} // namespace kudu +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/throttler-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/throttler-test.cc b/be/src/kudu/util/throttler-test.cc new file mode 100644 index 0000000..57563d2 --- /dev/null +++ b/be/src/kudu/util/throttler-test.cc @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/throttler.h" + +#include <algorithm> +#include <cmath> +#include <cstring> + +#include "kudu/util/test_util.h" + +namespace kudu { + +class ThrottlerTest : public KuduTest { +}; + +TEST_F(ThrottlerTest, TestOpThrottle) { + // Check operation rate throttling + MonoTime now = MonoTime::Now(); + Throttler t0(now, 1000, 1000*1000, 1); + // Fill up bucket + now += MonoDelta::FromMilliseconds(2000); + // Check throttle behavior for 1 second. + for (int p = 0; p < 10; p++) { + for (int i = 0; i < 100; i++) { + ASSERT_TRUE(t0.Take(now, 1, 1)); + } + ASSERT_FALSE(t0.Take(now, 1, 1)); + now += MonoDelta::FromMilliseconds(100); + } +} + +TEST_F(ThrottlerTest, TestIOThrottle) { + // Check operation rate throttling + MonoTime now = MonoTime::Now(); + Throttler t0(now, 50000, 1000*1000, 1); + // Fill up bucket + now += MonoDelta::FromMilliseconds(2000); + // Check throttle behavior for 1 second. + for (int p = 0; p < 10; p++) { + for (int i = 0; i < 100; i++) { + ASSERT_TRUE(t0.Take(now, 1, 1000)); + } + ASSERT_FALSE(t0.Take(now, 1, 1000)); + now += MonoDelta::FromMilliseconds(100); + } +} + +TEST_F(ThrottlerTest, TestBurst) { + // Check IO rate throttling + MonoTime now = MonoTime::Now(); + Throttler t0(now, 2000, 1000*1000, 5); + // Fill up bucket + now += MonoDelta::FromMilliseconds(2000); + for (int i = 0; i < 100; i++) { + now += MonoDelta::FromMilliseconds(1); + ASSERT_TRUE(t0.Take(now, 1, 5000)); + } + ASSERT_TRUE(t0.Take(now, 1, 100000)); + ASSERT_FALSE(t0.Take(now, 1, 1)); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/throttler.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/throttler.cc b/be/src/kudu/util/throttler.cc new file mode 100644 index 0000000..108d5db --- /dev/null +++ b/be/src/kudu/util/throttler.cc @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/throttler.h" + +#include <mutex> + +namespace kudu { + +Throttler::Throttler(MonoTime now, uint64_t op_rate, uint64_t byte_rate, double burst_factor) : + next_refill_(now) { + op_refill_ = op_rate / (MonoTime::kMicrosecondsPerSecond / kRefillPeriodMicros); + op_token_ = 0; + op_token_max_ = static_cast<uint64_t>(op_refill_ * burst_factor); + byte_refill_ = byte_rate / (MonoTime::kMicrosecondsPerSecond / kRefillPeriodMicros); + byte_token_ = 0; + byte_token_max_ = static_cast<uint64_t>(byte_refill_ * burst_factor); +} + +bool Throttler::Take(MonoTime now, uint64_t op, uint64_t byte) { + if (op_refill_ == 0 && byte_refill_ == 0) { + return true; + } + std::lock_guard<simple_spinlock> lock(lock_); + Refill(now); + if ((op_refill_ == 0 || op <= op_token_) && + (byte_refill_ == 0 || byte <= byte_token_)) { + if (op_refill_ > 0) { + op_token_ -= op; + } + if (byte_refill_ > 0) { + byte_token_ -= byte; + } + return true; + } + return false; +} + +void Throttler::Refill(MonoTime now) { + int64_t d = (now - next_refill_).ToMicroseconds(); + if (d < 0) { + return; + } + uint64_t num_period = d / kRefillPeriodMicros + 1; + next_refill_ += MonoDelta::FromMicroseconds(num_period * kRefillPeriodMicros); + op_token_ += num_period * op_refill_; + op_token_ = std::min(op_token_, op_token_max_); + byte_token_ += num_period * byte_refill_; + byte_token_ = std::min(byte_token_, byte_token_max_); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/throttler.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/throttler.h b/be/src/kudu/util/throttler.h new file mode 100644 index 0000000..7be7ba1 --- /dev/null +++ b/be/src/kudu/util/throttler.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_THROTTLER_H +#define KUDU_UTIL_THROTTLER_H + +#include <algorithm> + +#include "kudu/gutil/macros.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" + +namespace kudu { + +// A throttler to throttle both operation/s and IO byte/s. +class Throttler { + public: + // Refill period is 100ms. + enum { + kRefillPeriodMicros = 100000 + }; + + // Construct a throttler with max operation per second, max IO bytes per second + // and burst factor (burst_rate = rate * burst), burst rate means maximum + // throughput within one refill period. + // Set op_per_sec to 0 to disable operation throttling. + // Set byte_per_sec to 0 to disable IO bytes throttling. + Throttler(MonoTime now, uint64_t op_per_sec, uint64_t byte_per_sec, double burst_factor); + + // Throttle an "operation group" by taking 'op' operation tokens and 'byte' byte tokens. + // Return true if there are enough tokens, and operation is allowed. + // Return false if there are not enough tokens, and operation is throttled. + bool Take(MonoTime now, uint64_t op, uint64_t byte); + + private: + void Refill(MonoTime now); + + MonoTime next_refill_; + uint64_t op_refill_; + uint64_t op_token_; + uint64_t op_token_max_; + uint64_t byte_refill_; + uint64_t byte_token_; + uint64_t byte_token_max_; + simple_spinlock lock_; +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/trace-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/trace-test.cc b/be/src/kudu/util/trace-test.cc new file mode 100644 index 0000000..6c38b0c --- /dev/null +++ b/be/src/kudu/util/trace-test.cc @@ -0,0 +1,845 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include <glog/stl_logging.h> +#include <string> +#include <rapidjson/document.h> +#include <rapidjson/rapidjson.h> + +#include "kudu/util/trace.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/debug/trace_event_synthetic_delay.h" +#include "kudu/util/debug/trace_logging.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_util.h" + +using kudu::debug::TraceLog; +using kudu::debug::TraceResultBuffer; +using kudu::debug::CategoryFilter; +using rapidjson::Document; +using rapidjson::Value; +using std::string; +using std::vector; + +namespace kudu { + +class TraceTest : public KuduTest { +}; + +// Replace all digits in 's' with the character 'X'. +static string XOutDigits(const string& s) { + string ret; + ret.reserve(s.size()); + for (char c : s) { + if (isdigit(c)) { + ret.push_back('X'); + } else { + ret.push_back(c); + } + } + return ret; +} + +TEST_F(TraceTest, TestBasic) { + scoped_refptr<Trace> t(new Trace); + TRACE_TO(t, "hello $0, $1", "world", 12345); + TRACE_TO(t, "goodbye $0, $1", "cruel world", 54321); + + string result = XOutDigits(t->DumpToString(Trace::NO_FLAGS)); + ASSERT_EQ("XXXX XX:XX:XX.XXXXXX trace-test.cc:XX] hello world, XXXXX\n" + "XXXX XX:XX:XX.XXXXXX trace-test.cc:XX] goodbye cruel world, XXXXX\n", + result); +} + +TEST_F(TraceTest, TestAttach) { + scoped_refptr<Trace> traceA(new Trace); + scoped_refptr<Trace> traceB(new Trace); + { + ADOPT_TRACE(traceA.get()); + EXPECT_EQ(traceA.get(), Trace::CurrentTrace()); + { + ADOPT_TRACE(traceB.get()); + EXPECT_EQ(traceB.get(), Trace::CurrentTrace()); + TRACE("hello from traceB"); + } + EXPECT_EQ(traceA.get(), Trace::CurrentTrace()); + TRACE("hello from traceA"); + } + EXPECT_TRUE(Trace::CurrentTrace() == nullptr); + TRACE("this goes nowhere"); + + EXPECT_EQ(XOutDigits(traceA->DumpToString(Trace::NO_FLAGS)), + "XXXX XX:XX:XX.XXXXXX trace-test.cc:XX] hello from traceA\n"); + EXPECT_EQ(XOutDigits(traceB->DumpToString(Trace::NO_FLAGS)), + "XXXX XX:XX:XX.XXXXXX trace-test.cc:XX] hello from traceB\n"); +} + +TEST_F(TraceTest, TestChildTrace) { + scoped_refptr<Trace> traceA(new Trace); + scoped_refptr<Trace> traceB(new Trace); + ADOPT_TRACE(traceA.get()); + traceA->AddChildTrace("child", traceB.get()); + TRACE("hello from traceA"); + { + ADOPT_TRACE(traceB.get()); + TRACE("hello from traceB"); + } + EXPECT_EQ(XOutDigits(traceA->DumpToString(Trace::NO_FLAGS)), + "XXXX XX:XX:XX.XXXXXX trace-test.cc:XX] hello from traceA\n" + "Related trace 'child':\n" + "XXXX XX:XX:XX.XXXXXX trace-test.cc:XXX] hello from traceB\n"); +} + +static void GenerateTraceEvents(int thread_id, + int num_events) { + for (int i = 0; i < num_events; i++) { + TRACE_EVENT1("test", "foo", "thread_id", thread_id); + } +} + +// Parse the dumped trace data and return the number of events +// found within, including only those with the "test" category. +int ParseAndReturnEventCount(const string& trace_json) { + Document d; + d.Parse<0>(trace_json.c_str()); + CHECK(d.IsObject()) << "bad json: " << trace_json; + const Value& events_json = d["traceEvents"]; + CHECK(events_json.IsArray()) << "bad json: " << trace_json; + + // Count how many of our events were seen. We have to filter out + // the metadata events. + int seen_real_events = 0; + for (int i = 0; i < events_json.Size(); i++) { + if (events_json[i]["cat"].GetString() == string("test")) { + seen_real_events++; + } + } + + return seen_real_events; +} + +TEST_F(TraceTest, TestChromeTracing) { + const int kNumThreads = 4; + const int kEventsPerThread = AllowSlowTests() ? 1000000 : 10000; + + TraceLog* tl = TraceLog::GetInstance(); + tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_CONTINUOUSLY); + + vector<scoped_refptr<Thread> > threads(kNumThreads); + + Stopwatch s; + s.start(); + for (int i = 0; i < kNumThreads; i++) { + CHECK_OK(Thread::Create("test", "gen-traces", &GenerateTraceEvents, i, kEventsPerThread, + &threads[i])); + } + + for (int i = 0; i < kNumThreads; i++) { + threads[i]->Join(); + } + tl->SetDisabled(); + + int total_events = kNumThreads * kEventsPerThread; + double elapsed = s.elapsed().wall_seconds(); + + LOG(INFO) << "Trace performance: " << static_cast<int>(total_events / elapsed) << " traces/sec"; + + string trace_json = TraceResultBuffer::FlushTraceLogToString(); + + // Verify that the JSON contains events. It won't have exactly + // kEventsPerThread * kNumThreads because the trace buffer isn't large enough + // for that. + ASSERT_GE(ParseAndReturnEventCount(trace_json), 100); +} + +// Test that, if a thread exits before filling a full trace buffer, we still +// see its results. This is a regression test for a bug in the earlier integration +// of Chromium tracing into Kudu. +TEST_F(TraceTest, TestTraceFromExitedThread) { + TraceLog* tl = TraceLog::GetInstance(); + tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_CONTINUOUSLY); + + // Generate 10 trace events in a separate thread. + int kNumEvents = 10; + scoped_refptr<Thread> t; + CHECK_OK(Thread::Create("test", "gen-traces", &GenerateTraceEvents, 1, kNumEvents, + &t)); + t->Join(); + tl->SetDisabled(); + string trace_json = TraceResultBuffer::FlushTraceLogToString(); + LOG(INFO) << trace_json; + + // Verify that the buffer contains 10 trace events + ASSERT_EQ(10, ParseAndReturnEventCount(trace_json)); +} + +static void GenerateWideSpan() { + TRACE_EVENT0("test", "GenerateWideSpan"); + for (int i = 0; i < 1000; i++) { + TRACE_EVENT0("test", "InnerLoop"); + } +} + +// Test creating a trace event which contains many other trace events. +// This ensures that we can go back and update a TraceEvent which fell in +// a different trace chunk. +TEST_F(TraceTest, TestWideSpan) { + TraceLog* tl = TraceLog::GetInstance(); + tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_CONTINUOUSLY); + + scoped_refptr<Thread> t; + CHECK_OK(Thread::Create("test", "gen-traces", &GenerateWideSpan, &t)); + t->Join(); + tl->SetDisabled(); + + string trace_json = TraceResultBuffer::FlushTraceLogToString(); + ASSERT_EQ(1001, ParseAndReturnEventCount(trace_json)); +} + +// Regression test for KUDU-753: faulty JSON escaping when dealing with +// single quote characters. +TEST_F(TraceTest, TestJsonEncodingString) { + TraceLog* tl = TraceLog::GetInstance(); + tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_CONTINUOUSLY); + { + TRACE_EVENT1("test", "test", "arg", "this is a test with \"'\"' and characters\nand new lines"); + } + tl->SetDisabled(); + string trace_json = TraceResultBuffer::FlushTraceLogToString(); + ASSERT_EQ(1, ParseAndReturnEventCount(trace_json)); +} + +// Generate trace events continuously until 'latch' fires. +// Increment *num_events_generated for each event generated. +void GenerateTracesUntilLatch(AtomicInt<int64_t>* num_events_generated, + CountDownLatch* latch) { + while (latch->count()) { + { + // This goes in its own scope so that the event is fully generated (with + // both its START and END times) before we do the counter increment below. + TRACE_EVENT0("test", "GenerateTracesUntilLatch"); + } + num_events_generated->Increment(); + } +} + +// Test starting and stopping tracing while a thread is running. +// This is a regression test for bugs in earlier versions of the imported +// trace code. +TEST_F(TraceTest, TestStartAndStopCollection) { + TraceLog* tl = TraceLog::GetInstance(); + + CountDownLatch latch(1); + AtomicInt<int64_t> num_events_generated(0); + scoped_refptr<Thread> t; + CHECK_OK(Thread::Create("test", "gen-traces", &GenerateTracesUntilLatch, + &num_events_generated, &latch, &t)); + + const int num_flushes = AllowSlowTests() ? 50 : 3; + for (int i = 0; i < num_flushes; i++) { + tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_CONTINUOUSLY); + + const int64_t num_events_before = num_events_generated.Load(); + SleepFor(MonoDelta::FromMilliseconds(10)); + const int64_t num_events_after = num_events_generated.Load(); + tl->SetDisabled(); + + string trace_json = TraceResultBuffer::FlushTraceLogToString(); + // We might under-count the number of events, since we only measure the sleep, + // and tracing is enabled before and disabled after we start counting. + // We might also over-count by at most 1, because we could enable tracing + // right in between creating a trace event and incrementing the counter. + // But, we should never over-count by more than 1. + int expected_events_lowerbound = num_events_after - num_events_before - 1; + int captured_events = ParseAndReturnEventCount(trace_json); + ASSERT_GE(captured_events, expected_events_lowerbound); + } + + latch.CountDown(); + t->Join(); +} + +TEST_F(TraceTest, TestChromeSampling) { + TraceLog* tl = TraceLog::GetInstance(); + tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString), + TraceLog::RECORDING_MODE, + static_cast<TraceLog::Options>(TraceLog::RECORD_CONTINUOUSLY | + TraceLog::ENABLE_SAMPLING)); + + for (int i = 0; i < 100; i++) { + switch (i % 3) { + case 0: + TRACE_EVENT_SET_SAMPLING_STATE("test", "state-0"); + break; + case 1: + TRACE_EVENT_SET_SAMPLING_STATE("test", "state-1"); + break; + case 2: + TRACE_EVENT_SET_SAMPLING_STATE("test", "state-2"); + break; + } + SleepFor(MonoDelta::FromMilliseconds(1)); + } + tl->SetDisabled(); + string trace_json = TraceResultBuffer::FlushTraceLogToString(); + ASSERT_GT(ParseAndReturnEventCount(trace_json), 0); +} + +class TraceEventCallbackTest : public KuduTest { + public: + virtual void SetUp() OVERRIDE { + KuduTest::SetUp(); + ASSERT_EQ(nullptr, s_instance); + s_instance = this; + } + virtual void TearDown() OVERRIDE { + TraceLog::GetInstance()->SetDisabled(); + + // Flush the buffer so that one test doesn't end up leaving any + // extra results for the next test. + TraceResultBuffer::FlushTraceLogToString(); + + ASSERT_TRUE(!!s_instance); + s_instance = nullptr; + KuduTest::TearDown(); + + } + + protected: + void EndTraceAndFlush() { + TraceLog::GetInstance()->SetDisabled(); + string trace_json = TraceResultBuffer::FlushTraceLogToString(); + trace_doc_.Parse<0>(trace_json.c_str()); + LOG(INFO) << trace_json; + ASSERT_TRUE(trace_doc_.IsObject()); + trace_parsed_ = trace_doc_["traceEvents"]; + ASSERT_TRUE(trace_parsed_.IsArray()); + } + + void DropTracedMetadataRecords() { + // NB: rapidjson has move-semantics, like auto_ptr. + Value old_trace_parsed; + old_trace_parsed = trace_parsed_; + trace_parsed_.SetArray(); + size_t old_trace_parsed_size = old_trace_parsed.Size(); + + for (size_t i = 0; i < old_trace_parsed_size; i++) { + Value value; + value = old_trace_parsed[i]; + if (value.GetType() != rapidjson::kObjectType) { + trace_parsed_.PushBack(value, trace_doc_.GetAllocator()); + continue; + } + string tmp; + if (value.HasMember("ph") && strcmp(value["ph"].GetString(), "M") == 0) { + continue; + } + + trace_parsed_.PushBack(value, trace_doc_.GetAllocator()); + } + } + + // Search through the given array for any dictionary which has a key + // or value which has 'string_to_match' as a substring. + // Returns the matching dictionary, or NULL. + static const Value* FindTraceEntry( + const Value& trace_parsed, + const char* string_to_match) { + // Scan all items + size_t trace_parsed_count = trace_parsed.Size(); + for (size_t i = 0; i < trace_parsed_count; i++) { + const Value& value = trace_parsed[i]; + if (value.GetType() != rapidjson::kObjectType) { + continue; + } + + for (Value::ConstMemberIterator it = value.MemberBegin(); + it != value.MemberEnd(); + ++it) { + if (it->name.IsString() && strstr(it->name.GetString(), string_to_match) != nullptr) { + return &value; + } + if (it->value.IsString() && strstr(it->value.GetString(), string_to_match) != nullptr) { + return &value; + } + } + } + return nullptr; + } + + // For TraceEventCallbackAndRecordingX tests. + void VerifyCallbackAndRecordedEvents(size_t expected_callback_count, + size_t expected_recorded_count) { + // Callback events. + EXPECT_EQ(expected_callback_count, collected_events_names_.size()); + for (size_t i = 0; i < collected_events_names_.size(); ++i) { + EXPECT_EQ("callback", collected_events_categories_[i]); + EXPECT_EQ("yes", collected_events_names_[i]); + } + + // Recorded events. + EXPECT_EQ(expected_recorded_count, trace_parsed_.Size()); + EXPECT_TRUE(FindTraceEntry(trace_parsed_, "recording")); + EXPECT_FALSE(FindTraceEntry(trace_parsed_, "callback")); + EXPECT_TRUE(FindTraceEntry(trace_parsed_, "yes")); + EXPECT_FALSE(FindTraceEntry(trace_parsed_, "no")); + } + + void VerifyCollectedEvent(size_t i, + unsigned phase, + const string& category, + const string& name) { + EXPECT_EQ(phase, collected_events_phases_[i]); + EXPECT_EQ(category, collected_events_categories_[i]); + EXPECT_EQ(name, collected_events_names_[i]); + } + + Document trace_doc_; + Value trace_parsed_; + + vector<string> collected_events_categories_; + vector<string> collected_events_names_; + vector<unsigned char> collected_events_phases_; + vector<MicrosecondsInt64> collected_events_timestamps_; + + static TraceEventCallbackTest* s_instance; + static void Callback(MicrosecondsInt64 timestamp, + char phase, + const unsigned char* category_group_enabled, + const char* name, + uint64_t id, + int num_args, + const char* const arg_names[], + const unsigned char arg_types[], + const uint64_t arg_values[], + unsigned char flags) { + s_instance->collected_events_phases_.push_back(phase); + s_instance->collected_events_categories_.push_back( + TraceLog::GetCategoryGroupName(category_group_enabled)); + s_instance->collected_events_names_.push_back(name); + s_instance->collected_events_timestamps_.push_back(timestamp); + } +}; + +TraceEventCallbackTest* TraceEventCallbackTest::s_instance; + +TEST_F(TraceEventCallbackTest, TraceEventCallback) { + TRACE_EVENT_INSTANT0("all", "before enable", TRACE_EVENT_SCOPE_THREAD); + TraceLog::GetInstance()->SetEventCallbackEnabled( + CategoryFilter("*"), Callback); + TRACE_EVENT_INSTANT0("all", "event1", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("all", "event2", TRACE_EVENT_SCOPE_GLOBAL); + { + TRACE_EVENT0("all", "duration"); + TRACE_EVENT_INSTANT0("all", "event3", TRACE_EVENT_SCOPE_GLOBAL); + } + TraceLog::GetInstance()->SetEventCallbackDisabled(); + TRACE_EVENT_INSTANT0("all", "after callback removed", + TRACE_EVENT_SCOPE_GLOBAL); + ASSERT_EQ(5u, collected_events_names_.size()); + EXPECT_EQ("event1", collected_events_names_[0]); + EXPECT_EQ(TRACE_EVENT_PHASE_INSTANT, collected_events_phases_[0]); + EXPECT_EQ("event2", collected_events_names_[1]); + EXPECT_EQ(TRACE_EVENT_PHASE_INSTANT, collected_events_phases_[1]); + EXPECT_EQ("duration", collected_events_names_[2]); + EXPECT_EQ(TRACE_EVENT_PHASE_BEGIN, collected_events_phases_[2]); + EXPECT_EQ("event3", collected_events_names_[3]); + EXPECT_EQ(TRACE_EVENT_PHASE_INSTANT, collected_events_phases_[3]); + EXPECT_EQ("duration", collected_events_names_[4]); + EXPECT_EQ(TRACE_EVENT_PHASE_END, collected_events_phases_[4]); + for (size_t i = 1; i < collected_events_timestamps_.size(); i++) { + EXPECT_LE(collected_events_timestamps_[i - 1], + collected_events_timestamps_[i]); + } +} + +TEST_F(TraceEventCallbackTest, TraceEventCallbackWhileFull) { + TraceLog::GetInstance()->SetEnabled( + CategoryFilter("*"), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_UNTIL_FULL); + do { + TRACE_EVENT_INSTANT0("all", "badger badger", TRACE_EVENT_SCOPE_GLOBAL); + } while (!TraceLog::GetInstance()->BufferIsFull()); + TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("*"), + Callback); + TRACE_EVENT_INSTANT0("all", "a snake", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEventCallbackDisabled(); + ASSERT_EQ(1u, collected_events_names_.size()); + EXPECT_EQ("a snake", collected_events_names_[0]); +} + +// 1: Enable callback, enable recording, disable callback, disable recording. +TEST_F(TraceEventCallbackTest, TraceEventCallbackAndRecording1) { + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("callback"), + Callback); + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEnabled( + CategoryFilter("recording"), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_UNTIL_FULL); + TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEventCallbackDisabled(); + TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + EndTraceAndFlush(); + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + + DropTracedMetadataRecords(); + ASSERT_NO_FATAL_FAILURE(); + VerifyCallbackAndRecordedEvents(2, 2); +} + +// 2: Enable callback, enable recording, disable recording, disable callback. +TEST_F(TraceEventCallbackTest, TraceEventCallbackAndRecording2) { + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("callback"), + Callback); + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEnabled( + CategoryFilter("recording"), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_UNTIL_FULL); + TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL); + EndTraceAndFlush(); + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEventCallbackDisabled(); + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + + DropTracedMetadataRecords(); + VerifyCallbackAndRecordedEvents(3, 1); +} + +// 3: Enable recording, enable callback, disable callback, disable recording. +TEST_F(TraceEventCallbackTest, TraceEventCallbackAndRecording3) { + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEnabled( + CategoryFilter("recording"), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_UNTIL_FULL); + TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("callback"), + Callback); + TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEventCallbackDisabled(); + TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + EndTraceAndFlush(); + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + + DropTracedMetadataRecords(); + VerifyCallbackAndRecordedEvents(1, 3); +} + +// 4: Enable recording, enable callback, disable recording, disable callback. +TEST_F(TraceEventCallbackTest, TraceEventCallbackAndRecording4) { + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEnabled( + CategoryFilter("recording"), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_UNTIL_FULL); + TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("callback"), + Callback); + TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL); + EndTraceAndFlush(); + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL); + TraceLog::GetInstance()->SetEventCallbackDisabled(); + TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL); + TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL); + + DropTracedMetadataRecords(); + VerifyCallbackAndRecordedEvents(2, 2); +} + +TEST_F(TraceEventCallbackTest, TraceEventCallbackAndRecordingDuration) { + TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("*"), + Callback); + { + TRACE_EVENT0("callback", "duration1"); + TraceLog::GetInstance()->SetEnabled( + CategoryFilter("*"), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_UNTIL_FULL); + TRACE_EVENT0("callback", "duration2"); + EndTraceAndFlush(); + TRACE_EVENT0("callback", "duration3"); + } + TraceLog::GetInstance()->SetEventCallbackDisabled(); + + ASSERT_EQ(6u, collected_events_names_.size()); + VerifyCollectedEvent(0, TRACE_EVENT_PHASE_BEGIN, "callback", "duration1"); + VerifyCollectedEvent(1, TRACE_EVENT_PHASE_BEGIN, "callback", "duration2"); + VerifyCollectedEvent(2, TRACE_EVENT_PHASE_BEGIN, "callback", "duration3"); + VerifyCollectedEvent(3, TRACE_EVENT_PHASE_END, "callback", "duration3"); + VerifyCollectedEvent(4, TRACE_EVENT_PHASE_END, "callback", "duration2"); + VerifyCollectedEvent(5, TRACE_EVENT_PHASE_END, "callback", "duration1"); +} + +//////////////////////////////////////////////////////////// +// Tests for synthetic delay +// (from chromium-base/debug/trace_event_synthetic_delay_unittest.cc) +//////////////////////////////////////////////////////////// + +namespace { + +const int kTargetDurationMs = 100; +// Allow some leeway in timings to make it possible to run these tests with a +// wall clock time source too. +const int kShortDurationMs = 10; + +} // namespace + +namespace debug { + +class TraceEventSyntheticDelayTest : public KuduTest, + public TraceEventSyntheticDelayClock { + public: + TraceEventSyntheticDelayTest() { + now_ = MonoTime::Min(); + } + + virtual ~TraceEventSyntheticDelayTest() { + ResetTraceEventSyntheticDelays(); + } + + // TraceEventSyntheticDelayClock implementation. + virtual MonoTime Now() OVERRIDE { + AdvanceTime(MonoDelta::FromMilliseconds(kShortDurationMs / 10)); + return now_; + } + + TraceEventSyntheticDelay* ConfigureDelay(const char* name) { + TraceEventSyntheticDelay* delay = TraceEventSyntheticDelay::Lookup(name); + delay->SetClock(this); + delay->SetTargetDuration( + MonoDelta::FromMilliseconds(kTargetDurationMs)); + return delay; + } + + void AdvanceTime(MonoDelta delta) { now_ += delta; } + + int TestFunction() { + MonoTime start = Now(); + { TRACE_EVENT_SYNTHETIC_DELAY("test.Delay"); } + MonoTime end = Now(); + return (end - start).ToMilliseconds(); + } + + int AsyncTestFunctionBegin() { + MonoTime start = Now(); + { TRACE_EVENT_SYNTHETIC_DELAY_BEGIN("test.AsyncDelay"); } + MonoTime end = Now(); + return (end - start).ToMilliseconds(); + } + + int AsyncTestFunctionEnd() { + MonoTime start = Now(); + { TRACE_EVENT_SYNTHETIC_DELAY_END("test.AsyncDelay"); } + MonoTime end = Now(); + return (end - start).ToMilliseconds(); + } + + private: + MonoTime now_; + + DISALLOW_COPY_AND_ASSIGN(TraceEventSyntheticDelayTest); +}; + +TEST_F(TraceEventSyntheticDelayTest, StaticDelay) { + TraceEventSyntheticDelay* delay = ConfigureDelay("test.Delay"); + delay->SetMode(TraceEventSyntheticDelay::STATIC); + EXPECT_GE(TestFunction(), kTargetDurationMs); +} + +TEST_F(TraceEventSyntheticDelayTest, OneShotDelay) { + TraceEventSyntheticDelay* delay = ConfigureDelay("test.Delay"); + delay->SetMode(TraceEventSyntheticDelay::ONE_SHOT); + EXPECT_GE(TestFunction(), kTargetDurationMs); + EXPECT_LT(TestFunction(), kShortDurationMs); + + delay->SetTargetDuration( + MonoDelta::FromMilliseconds(kTargetDurationMs)); + EXPECT_GE(TestFunction(), kTargetDurationMs); +} + +TEST_F(TraceEventSyntheticDelayTest, AlternatingDelay) { + TraceEventSyntheticDelay* delay = ConfigureDelay("test.Delay"); + delay->SetMode(TraceEventSyntheticDelay::ALTERNATING); + EXPECT_GE(TestFunction(), kTargetDurationMs); + EXPECT_LT(TestFunction(), kShortDurationMs); + EXPECT_GE(TestFunction(), kTargetDurationMs); + EXPECT_LT(TestFunction(), kShortDurationMs); +} + +TEST_F(TraceEventSyntheticDelayTest, AsyncDelay) { + ConfigureDelay("test.AsyncDelay"); + EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs); + EXPECT_GE(AsyncTestFunctionEnd(), kTargetDurationMs / 2); +} + +TEST_F(TraceEventSyntheticDelayTest, AsyncDelayExceeded) { + ConfigureDelay("test.AsyncDelay"); + EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs); + AdvanceTime(MonoDelta::FromMilliseconds(kTargetDurationMs)); + EXPECT_LT(AsyncTestFunctionEnd(), kShortDurationMs); +} + +TEST_F(TraceEventSyntheticDelayTest, AsyncDelayNoActivation) { + ConfigureDelay("test.AsyncDelay"); + EXPECT_LT(AsyncTestFunctionEnd(), kShortDurationMs); +} + +TEST_F(TraceEventSyntheticDelayTest, AsyncDelayNested) { + ConfigureDelay("test.AsyncDelay"); + EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs); + EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs); + EXPECT_LT(AsyncTestFunctionEnd(), kShortDurationMs); + EXPECT_GE(AsyncTestFunctionEnd(), kTargetDurationMs / 2); +} + +TEST_F(TraceEventSyntheticDelayTest, AsyncDelayUnbalanced) { + ConfigureDelay("test.AsyncDelay"); + EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs); + EXPECT_GE(AsyncTestFunctionEnd(), kTargetDurationMs / 2); + EXPECT_LT(AsyncTestFunctionEnd(), kShortDurationMs); + + EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs); + EXPECT_GE(AsyncTestFunctionEnd(), kTargetDurationMs / 2); +} + +TEST_F(TraceEventSyntheticDelayTest, ResetDelays) { + ConfigureDelay("test.Delay"); + ResetTraceEventSyntheticDelays(); + EXPECT_LT(TestFunction(), kShortDurationMs); +} + +TEST_F(TraceEventSyntheticDelayTest, BeginParallel) { + TraceEventSyntheticDelay* delay = ConfigureDelay("test.AsyncDelay"); + MonoTime end_times[2]; + MonoTime start_time = Now(); + + delay->BeginParallel(&end_times[0]); + EXPECT_FALSE(!end_times[0].Initialized()); + + delay->BeginParallel(&end_times[1]); + EXPECT_FALSE(!end_times[1].Initialized()); + + delay->EndParallel(end_times[0]); + EXPECT_GE((Now() - start_time).ToMilliseconds(), kTargetDurationMs); + + start_time = Now(); + delay->EndParallel(end_times[1]); + EXPECT_LT((Now() - start_time).ToMilliseconds(), kShortDurationMs); +} + +TEST_F(TraceTest, TestVLogTrace) { + for (FLAGS_v = 0; FLAGS_v <= 1; FLAGS_v++) { + TraceLog* tl = TraceLog::GetInstance(); + tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString), + TraceLog::RECORDING_MODE, + TraceLog::RECORD_CONTINUOUSLY); + VLOG_AND_TRACE("test", 1) << "hello world"; + tl->SetDisabled(); + string trace_json = TraceResultBuffer::FlushTraceLogToString(); + ASSERT_STR_CONTAINS(trace_json, "hello world"); + ASSERT_STR_CONTAINS(trace_json, "trace-test.cc"); + } +} + +namespace { +string FunctionWithSideEffect(bool* b) { + *b = true; + return "function-result"; +} +} // anonymous namespace + +// Test that, if tracing is not enabled, a VLOG_AND_TRACE doesn't evaluate its +// arguments. +TEST_F(TraceTest, TestVLogTraceLazyEvaluation) { + FLAGS_v = 0; + bool function_run = false; + VLOG_AND_TRACE("test", 1) << FunctionWithSideEffect(&function_run); + ASSERT_FALSE(function_run); + + // If we enable verbose logging, we should run the side effect even though + // trace logging is disabled. + FLAGS_v = 1; + VLOG_AND_TRACE("test", 1) << FunctionWithSideEffect(&function_run); + ASSERT_TRUE(function_run); +} + +TEST_F(TraceTest, TestVLogAndEchoToConsole) { + TraceLog* tl = TraceLog::GetInstance(); + tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString), + TraceLog::RECORDING_MODE, + TraceLog::ECHO_TO_CONSOLE); + FLAGS_v = 1; + VLOG_AND_TRACE("test", 1) << "hello world"; + tl->SetDisabled(); +} + +TEST_F(TraceTest, TestTraceMetrics) { + scoped_refptr<Trace> trace(new Trace); + trace->metrics()->Increment("foo", 10); + trace->metrics()->Increment("bar", 10); + for (int i = 0; i < 1000; i++) { + trace->metrics()->Increment("baz", i); + } + EXPECT_EQ("{\"bar\":10,\"baz\":499500,\"foo\":10}", + trace->MetricsAsJSON()); + + { + ADOPT_TRACE(trace.get()); + TRACE_COUNTER_SCOPE_LATENCY_US("test_scope_us"); + SleepFor(MonoDelta::FromMilliseconds(100)); + } + auto m = trace->metrics()->Get(); + EXPECT_GE(m["test_scope_us"], 80 * 1000); +} + +} // namespace debug +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/trace.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/trace.cc b/be/src/kudu/util/trace.cc new file mode 100644 index 0000000..698c915 --- /dev/null +++ b/be/src/kudu/util/trace.cc @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/trace.h" + +#include <iomanip> +#include <ios> +#include <iostream> +#include <map> +#include <mutex> +#include <string> +#include <sstream> +#include <utility> +#include <vector> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/memory/arena.h" +#include "kudu/util/jsonwriter.h" + +namespace kudu { + +using strings::internal::SubstituteArg; + +__thread Trace* Trace::threadlocal_trace_; + +Trace::Trace() + : arena_(new ThreadSafeArena(1024, 128*1024)), + entries_head_(nullptr), + entries_tail_(nullptr) { +} + +Trace::~Trace() { +} + +// Struct which precedes each entry in the trace. +struct TraceEntry { + MicrosecondsInt64 timestamp_micros; + + // The source file and line number which generated the trace message. + const char* file_path; + int line_number; + + uint32_t message_len; + TraceEntry* next; + + // The actual trace message follows the entry header. + char* message() { + return reinterpret_cast<char*>(this) + sizeof(*this); + } +}; + +// Get the part of filepath after the last path separator. +// (Doesn't modify filepath, contrary to basename() in libgen.h.) +// Borrowed from glog. +static const char* const_basename(const char* filepath) { + const char* base = strrchr(filepath, '/'); +#ifdef OS_WINDOWS // Look for either path separator in Windows + if (!base) + base = strrchr(filepath, '\\'); +#endif + return base ? (base+1) : filepath; +} + + +void Trace::SubstituteAndTrace(const char* file_path, + int line_number, + StringPiece format, + const SubstituteArg& arg0, const SubstituteArg& arg1, + const SubstituteArg& arg2, const SubstituteArg& arg3, + const SubstituteArg& arg4, const SubstituteArg& arg5, + const SubstituteArg& arg6, const SubstituteArg& arg7, + const SubstituteArg& arg8, const SubstituteArg& arg9) { + const SubstituteArg* const args_array[] = { + &arg0, &arg1, &arg2, &arg3, &arg4, &arg5, &arg6, &arg7, &arg8, &arg9, nullptr + }; + + int msg_len = strings::internal::SubstitutedSize(format, args_array); + TraceEntry* entry = NewEntry(msg_len, file_path, line_number); + SubstituteToBuffer(format, args_array, entry->message()); + AddEntry(entry); +} + +TraceEntry* Trace::NewEntry(int msg_len, const char* file_path, int line_number) { + int size = sizeof(TraceEntry) + msg_len; + uint8_t* dst = reinterpret_cast<uint8_t*>(arena_->AllocateBytes(size)); + TraceEntry* entry = reinterpret_cast<TraceEntry*>(dst); + entry->timestamp_micros = GetCurrentTimeMicros(); + entry->message_len = msg_len; + entry->file_path = file_path; + entry->line_number = line_number; + return entry; +} + +void Trace::AddEntry(TraceEntry* entry) { + std::lock_guard<simple_spinlock> l(lock_); + entry->next = nullptr; + + if (entries_tail_ != nullptr) { + entries_tail_->next = entry; + } else { + DCHECK(entries_head_ == nullptr); + entries_head_ = entry; + } + entries_tail_ = entry; +} + +void Trace::Dump(std::ostream* out, int flags) const { + // Gather a copy of the list of entries under the lock. This is fast + // enough that we aren't worried about stalling concurrent tracers + // (whereas doing the logging itself while holding the lock might be + // too slow, if the output stream is a file, for example). + vector<TraceEntry*> entries; + vector<pair<StringPiece, scoped_refptr<Trace>>> child_traces; + { + std::lock_guard<simple_spinlock> l(lock_); + for (TraceEntry* cur = entries_head_; + cur != nullptr; + cur = cur->next) { + entries.push_back(cur); + } + + child_traces = child_traces_; + } + + // Save original flags. + std::ios::fmtflags save_flags(out->flags()); + + int64_t prev_usecs = 0; + for (TraceEntry* e : entries) { + // Log format borrowed from glog/logging.cc + time_t secs_since_epoch = e->timestamp_micros / 1000000; + int usecs = e->timestamp_micros % 1000000; + struct tm tm_time; + localtime_r(&secs_since_epoch, &tm_time); + + int64_t usecs_since_prev = 0; + if (prev_usecs != 0) { + usecs_since_prev = e->timestamp_micros - prev_usecs; + } + prev_usecs = e->timestamp_micros; + + using std::setw; + out->fill('0'); + + *out << setw(2) << (1 + tm_time.tm_mon) + << setw(2) << tm_time.tm_mday + << ' ' + << setw(2) << tm_time.tm_hour << ':' + << setw(2) << tm_time.tm_min << ':' + << setw(2) << tm_time.tm_sec << '.' + << setw(6) << usecs << ' '; + if (flags & INCLUDE_TIME_DELTAS) { + out->fill(' '); + *out << "(+" << setw(6) << usecs_since_prev << "us) "; + } + *out << const_basename(e->file_path) << ':' << e->line_number + << "] "; + out->write(reinterpret_cast<char*>(e) + sizeof(TraceEntry), + e->message_len); + *out << std::endl; + } + + for (const auto& entry : child_traces) { + const auto& t = entry.second; + *out << "Related trace '" << entry.first << "':" << std::endl; + *out << t->DumpToString(flags & (~INCLUDE_METRICS)); + } + + if (flags & INCLUDE_METRICS) { + *out << "Metrics: " << MetricsAsJSON(); + } + + // Restore stream flags. + out->flags(save_flags); +} + +string Trace::DumpToString(int flags) const { + std::ostringstream s; + Dump(&s, flags); + return s.str(); +} + +string Trace::MetricsAsJSON() const { + std::ostringstream s; + JsonWriter jw(&s, JsonWriter::COMPACT); + MetricsToJSON(&jw); + return s.str(); +} + +void Trace::MetricsToJSON(JsonWriter* jw) const { + // Convert into a map with 'std::string' keys instead of 'const char*' + // keys, so that the results are in a consistent (sorted) order. + std::map<string, int64_t> counters; + for (const auto& entry : metrics_.Get()) { + counters[entry.first] = entry.second; + } + + jw->StartObject(); + for (const auto& e : counters) { + jw->String(e.first); + jw->Int64(e.second); + } + vector<pair<StringPiece, scoped_refptr<Trace>>> child_traces; + { + std::lock_guard<simple_spinlock> l(lock_); + child_traces = child_traces_; + } + + if (!child_traces.empty()) { + jw->String("child_traces"); + jw->StartArray(); + + for (const auto& e : child_traces) { + jw->StartArray(); + jw->String(e.first.data(), e.first.size()); + e.second->MetricsToJSON(jw); + jw->EndArray(); + } + jw->EndArray(); + } + jw->EndObject(); +} + +void Trace::DumpCurrentTrace() { + Trace* t = CurrentTrace(); + if (t == nullptr) { + LOG(INFO) << "No trace is currently active."; + return; + } + t->Dump(&std::cerr, true); +} + +void Trace::AddChildTrace(StringPiece label, Trace* child_trace) { + CHECK(arena_->RelocateStringPiece(label, &label)); + + std::lock_guard<simple_spinlock> l(lock_); + scoped_refptr<Trace> ptr(child_trace); + child_traces_.emplace_back(label, ptr); +} + +std::vector<std::pair<StringPiece, scoped_refptr<Trace>>> Trace::ChildTraces() const { + std::lock_guard<simple_spinlock> l(lock_); + return child_traces_; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/trace.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/trace.h b/be/src/kudu/util/trace.h new file mode 100644 index 0000000..4d6df24 --- /dev/null +++ b/be/src/kudu/util/trace.h @@ -0,0 +1,308 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_TRACE_H +#define KUDU_UTIL_TRACE_H + +#include <iosfwd> +#include <string> +#include <utility> +#include <vector> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/strings/stringpiece.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/threading/thread_collision_warner.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/locks.h" +#include "kudu/util/trace_metrics.h" + +// Adopt a Trace on the current thread for the duration of the current +// scope. The old current Trace is restored when the scope is exited. +// +// 't' should be a Trace* pointer. +#define ADOPT_TRACE(t) kudu::ScopedAdoptTrace _adopt_trace(t); + +// Issue a trace message, if tracing is enabled in the current thread. +// See Trace::SubstituteAndTrace for arguments. +// Example: +// TRACE("Acquired timestamp $0", timestamp); +#define TRACE(format, substitutions...) \ + do { \ + kudu::Trace* _trace = Trace::CurrentTrace(); \ + if (_trace) { \ + _trace->SubstituteAndTrace(__FILE__, __LINE__, (format), \ + ##substitutions); \ + } \ + } while (0); + +// Like the above, but takes the trace pointer as an explicit argument. +#define TRACE_TO(trace, format, substitutions...) \ + (trace)->SubstituteAndTrace(__FILE__, __LINE__, (format), ##substitutions) + +// Increment a counter associated with the current trace. +// +// Each trace contains a map of counters which can be used to keep +// request-specific statistics. It is significantly faster to increment +// a trace counter compared to logging a message. Additionally, having +// slightly more structured information makes it easier to aggregate +// and show information back to operators. +// +// NOTE: the 'counter_name' MUST be a string which stays alive forever. +// Typically, this is a compile-time constant. If something other than +// a constant is required, use TraceMetric::InternName() in order to +// create a string which will last for the process lifetime. Of course, +// these strings will never be cleaned up, so it's important to use this +// judiciously. +// +// If no trace is active, this does nothing and does not evaluate its +// parameters. +#define TRACE_COUNTER_INCREMENT(counter_name, val) \ + do { \ + kudu::Trace* _trace = Trace::CurrentTrace(); \ + if (_trace) { \ + _trace->metrics()->Increment(counter_name, val); \ + } \ + } while (0); + +// Increment a counter for the amount of wall time spent in the current +// scope. For example: +// +// void DoFoo() { +// TRACE_COUNTER_SCOPE_LATENCY_US("foo_us"); +// ... do expensive Foo thing +// } +// +// will result in a trace metric indicating the number of microseconds spent +// in invocations of DoFoo(). +#define TRACE_COUNTER_SCOPE_LATENCY_US(counter_name) \ + ::kudu::ScopedTraceLatencyCounter _scoped_latency(counter_name) + +// Construct a constant C string counter name which acts as a sort of +// coarse-grained histogram for trace metrics. +#define BUCKETED_COUNTER_NAME(prefix, duration_us) \ + [=]() -> const char* { \ + if (duration_us >= 100 * 1000) { \ + return prefix "_gt_100_ms"; \ + } else if (duration_us >= 10 * 1000) { \ + return prefix "_10-100_ms"; \ + } else if (duration_us >= 1000) { \ + return prefix "_1-10_ms"; \ + } else { \ + return prefix "_lt_1ms"; \ + } \ + }(); + +namespace kudu { + +class JsonWriter; +class ThreadSafeArena; +struct TraceEntry; + +// A trace for a request or other process. This supports collecting trace entries +// from a number of threads, and later dumping the results to a stream. +// +// Callers should generally not add trace messages directly using the public +// methods of this class. Rather, the TRACE(...) macros defined above should +// be used such that file/line numbers are automatically included, etc. +// +// This class is thread-safe. +class Trace : public RefCountedThreadSafe<Trace> { + public: + Trace(); + + // Logs a message into the trace buffer. + // + // See strings::Substitute for details. + // + // N.B.: the file path passed here is not copied, so should be a static + // constant (eg __FILE__). + void SubstituteAndTrace(const char* filepath, int line_number, + StringPiece format, + const strings::internal::SubstituteArg& arg0 = + strings::internal::SubstituteArg::NoArg, + const strings::internal::SubstituteArg& arg1 = + strings::internal::SubstituteArg::NoArg, + const strings::internal::SubstituteArg& arg2 = + strings::internal::SubstituteArg::NoArg, + const strings::internal::SubstituteArg& arg3 = + strings::internal::SubstituteArg::NoArg, + const strings::internal::SubstituteArg& arg4 = + strings::internal::SubstituteArg::NoArg, + const strings::internal::SubstituteArg& arg5 = + strings::internal::SubstituteArg::NoArg, + const strings::internal::SubstituteArg& arg6 = + strings::internal::SubstituteArg::NoArg, + const strings::internal::SubstituteArg& arg7 = + strings::internal::SubstituteArg::NoArg, + const strings::internal::SubstituteArg& arg8 = + strings::internal::SubstituteArg::NoArg, + const strings::internal::SubstituteArg& arg9 = + strings::internal::SubstituteArg::NoArg); + + // Dump the trace buffer to the given output stream. + // + enum { + NO_FLAGS = 0, + + // If set, calculate and print the difference between successive trace messages. + INCLUDE_TIME_DELTAS = 1 << 0, + // If set, include a 'Metrics' line showing any attached trace metrics. + INCLUDE_METRICS = 1 << 1, + + INCLUDE_ALL = INCLUDE_TIME_DELTAS | INCLUDE_METRICS + }; + void Dump(std::ostream* out, int flags) const; + + // Dump the trace buffer as a string. + std::string DumpToString(int flags = INCLUDE_ALL) const; + + std::string MetricsAsJSON() const; + + // Attaches the given trace which will get appended at the end when Dumping. + // + // The 'label' does not necessarily have to be unique, and is used to identify + // the child trace when dumped. The contents of the StringPiece are copied + // into this trace's arena. + void AddChildTrace(StringPiece label, Trace* child_trace); + + // Return a copy of the current set of related "child" traces. + std::vector<std::pair<StringPiece, scoped_refptr<Trace>>> ChildTraces() const; + + // Return the current trace attached to this thread, if there is one. + static Trace* CurrentTrace() { + return threadlocal_trace_; + } + + // Simple function to dump the current trace to stderr, if one is + // available. This is meant for usage when debugging in gdb via + // 'call kudu::Trace::DumpCurrentTrace();'. + static void DumpCurrentTrace(); + + TraceMetrics* metrics() { + return &metrics_; + } + const TraceMetrics& metrics() const { + return metrics_; + } + + private: + friend class ScopedAdoptTrace; + friend class RefCountedThreadSafe<Trace>; + ~Trace(); + + // The current trace for this thread. Threads should only set this using + // using ScopedAdoptTrace, which handles reference counting the underlying + // object. + static __thread Trace* threadlocal_trace_; + + // Allocate a new entry from the arena, with enough space to hold a + // message of length 'len'. + TraceEntry* NewEntry(int len, const char* file_path, int line_number); + + // Add the entry to the linked list of entries. + void AddEntry(TraceEntry* entry); + + void MetricsToJSON(JsonWriter* jw) const; + + gscoped_ptr<ThreadSafeArena> arena_; + + // Lock protecting the entries linked list. + mutable simple_spinlock lock_; + // The head of the linked list of entries (allocated inside arena_) + TraceEntry* entries_head_; + // The tail of the linked list of entries (allocated inside arena_) + TraceEntry* entries_tail_; + + std::vector<std::pair<StringPiece, scoped_refptr<Trace>>> child_traces_; + + TraceMetrics metrics_; + + DISALLOW_COPY_AND_ASSIGN(Trace); +}; + +// Adopt a Trace object into the current thread for the duration +// of this object. +// This should only be used on the stack (and thus created and destroyed +// on the same thread) +class ScopedAdoptTrace { + public: + explicit ScopedAdoptTrace(Trace* t) : + old_trace_(Trace::threadlocal_trace_) { + Trace::threadlocal_trace_ = t; + if (t) { + t->AddRef(); + } + DFAKE_SCOPED_LOCK_THREAD_LOCKED(ctor_dtor_); + } + + ~ScopedAdoptTrace() { + auto t = Trace::threadlocal_trace_; + Trace::threadlocal_trace_ = old_trace_; + + // It's critical that we Release() the reference count on 't' only + // after we've unset the thread-local variable. Otherwise, we can hit + // a nasty interaction with tcmalloc contention profiling. Consider + // the following sequence: + // + // 1. threadlocal_trace_ has refcount = 1 + // 2. we call threadlocal_trace_->Release() which decrements refcount to 0 + // 3. this calls 'delete' on the Trace object + // 3a. this calls tcmalloc free() on the Trace and various sub-objects + // 3b. the free() calls may end up experiencing contention in tcmalloc + // 3c. we try to account the contention in threadlocal_trace_'s TraceMetrics, + // but it has already been freed. + // + // In the best case, we just scribble into some free tcmalloc memory. In the + // worst case, tcmalloc would have already re-used this memory for a new + // allocation on another thread, and we end up overwriting someone else's memory. + // + // Waiting to Release() only after 'unpublishing' the trace solves this. + if (t) { + t->Release(); + } + DFAKE_SCOPED_LOCK_THREAD_LOCKED(ctor_dtor_); + } + + private: + DFAKE_MUTEX(ctor_dtor_); + Trace* old_trace_; + + DISALLOW_COPY_AND_ASSIGN(ScopedAdoptTrace); +}; + +// Implementation for TRACE_COUNTER_SCOPE_LATENCY_US(...) macro above. +class ScopedTraceLatencyCounter { + public: + explicit ScopedTraceLatencyCounter(const char* counter) + : counter_(counter), + start_time_(GetCurrentTimeMicros()) { + } + + ~ScopedTraceLatencyCounter() { + TRACE_COUNTER_INCREMENT(counter_, GetCurrentTimeMicros() - start_time_); + } + + private: + const char* const counter_; + MicrosecondsInt64 start_time_; + DISALLOW_COPY_AND_ASSIGN(ScopedTraceLatencyCounter); +}; + +} // namespace kudu +#endif /* KUDU_UTIL_TRACE_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/trace_metrics.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/trace_metrics.cc b/be/src/kudu/util/trace_metrics.cc new file mode 100644 index 0000000..ba4ad97 --- /dev/null +++ b/be/src/kudu/util/trace_metrics.cc @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/trace_metrics.h" + +#include <algorithm> +#include <ctype.h> +#include <glog/stl_logging.h> +#include <map> +#include <mutex> +#include <string> + +#include "kudu/util/debug/leakcheck_disabler.h" + +using std::string; + +namespace kudu { + +// Make glog's STL-compatible operators visible inside this namespace. +using ::operator<<; + +namespace { + +static simple_spinlock g_intern_map_lock; +typedef std::map<string, const char*> InternMap; +static InternMap* g_intern_map; + +} // anonymous namespace + +const char* TraceMetrics::InternName(const string& name) { + DCHECK(std::all_of(name.begin(), name.end(), [] (char c) { return isprint(c); } )) + << "not printable: " << name; + + debug::ScopedLeakCheckDisabler no_leakcheck; + std::lock_guard<simple_spinlock> l(g_intern_map_lock); + if (g_intern_map == nullptr) { + g_intern_map = new InternMap(); + } + + InternMap::iterator it = g_intern_map->find(name); + if (it != g_intern_map->end()) { + return it->second; + } + + const char* dup = strdup(name.c_str()); + (*g_intern_map)[name] = dup; + + // We don't expect this map to grow large. + DCHECK_LT(g_intern_map->size(), 100) << + "Too many interned strings: " << *g_intern_map; + + return dup; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/trace_metrics.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/trace_metrics.h b/be/src/kudu/util/trace_metrics.h new file mode 100644 index 0000000..a230821 --- /dev/null +++ b/be/src/kudu/util/trace_metrics.h @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/map-util.h" +#include "kudu/util/atomic.h" +#include "kudu/util/locks.h" + +#include <map> +#include <mutex> +#include <string> + +namespace kudu { + +// A simple map of constant string names to integer counters. +// +// Typically, the TRACE_COUNTER_INCREMENT(...) macro defined in +// trace.h is used to increment a counter within this map. +// +// This currently is just a thin wrapper around a spinlocked map, +// but if it becomes noticeable in the CPU profile, various optimizations +// are plausible. +class TraceMetrics { + public: + TraceMetrics() : tcmalloc_contention_cycles_(0) {} + ~TraceMetrics() {} + + // Internalize the given string by duplicating it into a process-wide + // pool. If this string has already been interned, returns a pointer + // to a previous instance. Otherwise, copies it into the pool. + // + // The resulting strings are purposefully leaked, so this should only + // be used in cases where the number of unique strings that will be + // passed is relatively low (i.e. not user-specified). + // + // Because 'name' is exposed back to operators, it must be a printable + // ASCII string. + static const char* InternName(const std::string& name); + + // Increment the given counter. + void Increment(const char* name, int64_t amount); + + // Return a copy of the current counter map. + std::map<const char*, int64_t> Get() const; + + // Increment the number of cycles spent in tcmalloc lock contention. + // + // tcmalloc contention is stored separately from other metrics since + // it is incremented from a code path that prohibits allocation. + void IncrementTcmallocContentionCycles(int64_t cycles) { + tcmalloc_contention_cycles_.IncrementBy(cycles); + } + + // Return metric's current value. + // + // NOTE: the 'name' MUST be the same const char* which is used for + // insertion. This is because we do pointer-wise comparison internally. + int64_t GetMetric(const char* name) const; + + private: + mutable simple_spinlock lock_; + std::map<const char*, int64_t> counters_; + AtomicInt<int64_t> tcmalloc_contention_cycles_; + + DISALLOW_COPY_AND_ASSIGN(TraceMetrics); +}; + +inline void TraceMetrics::Increment(const char* name, int64_t amount) { + std::lock_guard<simple_spinlock> l(lock_); + counters_[name] += amount; +} + +inline std::map<const char*, int64_t> TraceMetrics::Get() const { + std::unique_lock<simple_spinlock> l(lock_); + auto m = counters_; + l.unlock(); + + auto v = tcmalloc_contention_cycles_.Load(); + if (v > 0) { + m["tcmalloc_contention_cycles"] = v; + } + return m; +} + +inline int64_t TraceMetrics::GetMetric(const char* name) const { + std::lock_guard<simple_spinlock> l(lock_); + return FindWithDefault(counters_, name, 0); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/url-coding-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/url-coding-test.cc b/be/src/kudu/util/url-coding-test.cc new file mode 100644 index 0000000..e83de64 --- /dev/null +++ b/be/src/kudu/util/url-coding-test.cc @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <stdlib.h> +#include <stdio.h> +#include <iostream> +#include <gtest/gtest.h> +#include "kudu/util/url-coding.h" + +using namespace std; // NOLINT(*) + +namespace kudu { + +// Tests encoding/decoding of input. If expected_encoded is non-empty, the +// encoded string is validated against it. +void TestUrl(const string& input, const string& expected_encoded, bool hive_compat) { + string intermediate; + UrlEncode(input, &intermediate, hive_compat); + string output; + if (!expected_encoded.empty()) { + EXPECT_EQ(expected_encoded, intermediate); + } + EXPECT_TRUE(UrlDecode(intermediate, &output, hive_compat)); + EXPECT_EQ(input, output); + + // Convert string to vector and try that also + vector<uint8_t> input_vector; + input_vector.resize(input.size()); + if (!input.empty()) { + memcpy(&input_vector[0], input.c_str(), input.size()); + } + string intermediate2; + UrlEncode(input_vector, &intermediate2, hive_compat); + EXPECT_EQ(intermediate, intermediate2); +} + +void TestBase64(const string& input, const string& expected_encoded) { + string intermediate; + Base64Encode(input, &intermediate); + string output; + if (!expected_encoded.empty()) { + EXPECT_EQ(intermediate, expected_encoded); + } + EXPECT_TRUE(Base64Decode(intermediate, &output)); + EXPECT_EQ(input, output); + + // Convert string to vector and try that also + vector<uint8_t> input_vector; + input_vector.resize(input.size()); + memcpy(&input_vector[0], input.c_str(), input.size()); + string intermediate2; + Base64Encode(input_vector, &intermediate2); + EXPECT_EQ(intermediate, intermediate2); +} + +// Test URL encoding. Check that the values that are put in are the +// same that come out. +TEST(UrlCodingTest, Basic) { + string input = "ABCDEFGHIJKLMNOPQRSTUWXYZ1234567890~!@#$%^&*()<>?,./:\";'{}|[]\\_+-="; + TestUrl(input, "", false); + TestUrl(input, "", true); +} + +TEST(UrlCodingTest, HiveExceptions) { + TestUrl(" +", " +", true); +} + +TEST(UrlCodingTest, BlankString) { + TestUrl("", "", false); + TestUrl("", "", true); +} + +TEST(UrlCodingTest, PathSeparators) { + TestUrl("/home/impala/directory/", "%2Fhome%2Fimpala%2Fdirectory%2F", false); + TestUrl("/home/impala/directory/", "%2Fhome%2Fimpala%2Fdirectory%2F", true); +} + +TEST(Base64Test, Basic) { + TestBase64("a", "YQ=="); + TestBase64("ab", "YWI="); + TestBase64("abc", "YWJj"); + TestBase64("abcd", "YWJjZA=="); + TestBase64("abcde", "YWJjZGU="); + TestBase64("abcdef", "YWJjZGVm"); +} + +TEST(HtmlEscapingTest, Basic) { + string before = "<html><body>&"; + ostringstream after; + EscapeForHtml(before, &after); + EXPECT_EQ(after.str(), "<html><body>&amp"); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/url-coding.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/url-coding.cc b/be/src/kudu/util/url-coding.cc new file mode 100644 index 0000000..afbaa44 --- /dev/null +++ b/be/src/kudu/util/url-coding.cc @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "kudu/util/url-coding.h" + +#include <algorithm> +#include <exception> +#include <sstream> + +#include <boost/algorithm/string.hpp> +#include <boost/archive/iterators/base64_from_binary.hpp> +#include <boost/archive/iterators/binary_from_base64.hpp> +#include <boost/archive/iterators/transform_width.hpp> +#include <glog/logging.h> + +using std::string; +using std::vector; +using namespace boost::archive::iterators; // NOLINT(*) + +namespace kudu { + +// Hive selectively encodes characters. This is the whitelist of +// characters it will encode. +// See common/src/java/org/apache/hadoop/hive/common/FileUtils.java +// in the Hive source code for the source of this list. +static boost::function<bool (char)> HiveShouldEscape = boost::is_any_of("\"#%\\*/:=?\u00FF"); // NOLINT(*) + +// It is more convenient to maintain the complement of the set of +// characters to escape when not in Hive-compat mode. +static boost::function<bool (char)> ShouldNotEscape = boost::is_any_of("-_.~"); // NOLINT(*) + +static inline void UrlEncode(const char* in, int in_len, string* out, bool hive_compat) { + (*out).reserve(in_len); + std::ostringstream ss; + for (int i = 0; i < in_len; ++i) { + const char ch = in[i]; + // Escape the character iff a) we are in Hive-compat mode and the + // character is in the Hive whitelist or b) we are not in + // Hive-compat mode, and the character is not alphanumeric or one + // of the four commonly excluded characters. + if ((hive_compat && HiveShouldEscape(ch)) || + (!hive_compat && !(isalnum(ch) || ShouldNotEscape(ch)))) { + ss << '%' << std::uppercase << std::hex << static_cast<uint32_t>(ch); + } else { + ss << ch; + } + } + + (*out) = ss.str(); +} + +void UrlEncode(const vector<uint8_t>& in, string* out, bool hive_compat) { + if (in.empty()) { + *out = ""; + } else { + UrlEncode(reinterpret_cast<const char*>(&in[0]), in.size(), out, hive_compat); + } +} + +void UrlEncode(const string& in, string* out, bool hive_compat) { + UrlEncode(in.c_str(), in.size(), out, hive_compat); +} + +string UrlEncodeToString(const std::string& in, bool hive_compat) { + string ret; + UrlEncode(in, &ret, hive_compat); + return ret; +} + +// Adapted from +// http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/ +// example/http/server3/request_handler.cpp +// See http://www.boost.org/LICENSE_1_0.txt for license for this method. +bool UrlDecode(const string& in, string* out, bool hive_compat) { + out->clear(); + out->reserve(in.size()); + for (size_t i = 0; i < in.size(); ++i) { + if (in[i] == '%') { + if (i + 3 <= in.size()) { + int value = 0; + std::istringstream is(in.substr(i + 1, 2)); + if (is >> std::hex >> value) { + (*out) += static_cast<char>(value); + i += 2; + } else { + return false; + } + } else { + return false; + } + } else if (!hive_compat && in[i] == '+') { // Hive does not encode ' ' as '+' + (*out) += ' '; + } else { + (*out) += in[i]; + } + } + return true; +} + +static inline void Base64Encode(const char* in, int in_len, std::ostringstream* out) { + typedef base64_from_binary<transform_width<const char*, 6, 8> > base64_encode; + // Base64 encodes 8 byte chars as 6 bit values. + std::ostringstream::pos_type len_before = out->tellp(); + copy(base64_encode(in), base64_encode(in + in_len), std::ostream_iterator<char>(*out)); + int bytes_written = out->tellp() - len_before; + // Pad with = to make it valid base64 encoded string + int num_pad = bytes_written % 4; + if (num_pad != 0) { + num_pad = 4 - num_pad; + for (int i = 0; i < num_pad; ++i) { + (*out) << "="; + } + } + DCHECK_EQ(out->str().size() % 4, 0); +} + +void Base64Encode(const vector<uint8_t>& in, string* out) { + if (in.empty()) { + *out = ""; + } else { + std::ostringstream ss; + Base64Encode(in, &ss); + *out = ss.str(); + } +} + +void Base64Encode(const vector<uint8_t>& in, std::ostringstream* out) { + if (!in.empty()) { + // Boost does not like non-null terminated strings + string tmp(reinterpret_cast<const char*>(&in[0]), in.size()); + Base64Encode(tmp.c_str(), tmp.size(), out); + } +} + +void Base64Encode(const string& in, string* out) { + std::ostringstream ss; + Base64Encode(in.c_str(), in.size(), &ss); + *out = ss.str(); +} + +void Base64Encode(const string& in, std::ostringstream* out) { + Base64Encode(in.c_str(), in.size(), out); +} + +bool Base64Decode(const string& in, string* out) { + typedef transform_width<binary_from_base64<string::const_iterator>, 8, 6> base64_decode; + string tmp = in; + // Replace padding with base64 encoded NULL + replace(tmp.begin(), tmp.end(), '=', 'A'); + try { + *out = string(base64_decode(tmp.begin()), base64_decode(tmp.end())); + } catch(std::exception& e) { + return false; + } + + // Remove trailing '\0' that were added as padding. Since \0 is special, + // the boost functions get confused so do this manually. + int num_padded_chars = 0; + for (int i = out->size() - 1; i >= 0; --i) { + if ((*out)[i] != '\0') break; + ++num_padded_chars; + } + out->resize(out->size() - num_padded_chars); + return true; +} + +void EscapeForHtml(const string& in, std::ostringstream* out) { + DCHECK(out != nullptr); + for (const char& c : in) { + switch (c) { + case '<': (*out) << "<"; + break; + case '>': (*out) << ">"; + break; + case '&': (*out) << "&"; + break; + default: (*out) << c; + } + } +} + +std::string EscapeForHtmlToString(const std::string& in) { + std::ostringstream str; + EscapeForHtml(in, &str); + return str.str(); +} + +} // namespace kudu
