http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadpool.h b/be/src/kudu/util/threadpool.h
new file mode 100644
index 0000000..7ac6d17
--- /dev/null
+++ b/be/src/kudu/util/threadpool.h
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_THREAD_POOL_H
+#define KUDU_UTIL_THREAD_POOL_H
+
+#include <boost/function.hpp>
+#include <gtest/gtest_prod.h>
+#include <list>
+#include <memory>
+#include <unordered_set>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/callback_forward.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Histogram;
+class Thread;
+class ThreadPool;
+class Trace;
+
+class Runnable {
+ public:
+  virtual void Run() = 0;
+  virtual ~Runnable() {}
+};
+
+// ThreadPool takes a lot of arguments. We provide sane defaults with a 
builder.
+//
+// name: Used for debugging output and default names of the worker threads.
+//    Since thread names are limited to 16 characters on Linux, it's good to
+//    choose a short name here.
+//    Required.
+//
+// trace_metric_prefix: used to prefix the names of TraceMetric counters.
+//    When a task on a thread pool has an associated trace, the thread pool
+//    implementation will increment TraceMetric counters to indicate the
+//    amount of time spent waiting in the queue as well as the amount of wall
+//    and CPU time spent executing. By default, these counters are prefixed
+//    with the name of the thread pool. For example, if the pool is named
+//    'apply', then counters such as 'apply.queue_time_us' will be
+//    incremented.
+//
+//    The TraceMetrics implementation relies on the number of distinct counter
+//    names being small. Thus, if the thread pool name itself is dynamically
+//    generated, the default behavior described above would result in an
+//    unbounded number of distinct cunter names. The 'trace_metric_prefix'
+//    setting can be used to override the prefix used in generating the trace
+//    metric names.
+//
+//    For example, the Raft thread pools are named "<tablet id>-raft" which
+//    has unbounded cardinality (a server may have thousands of different
+//    tablet IDs over its lifetime). In that case, setting the prefix to
+//    "raft" will avoid any issues.
+//
+// min_threads: Minimum number of threads we'll have at any time.
+//    Default: 0.
+//
+// max_threads: Maximum number of threads we'll have at any time.
+//    Default: Number of CPUs detected on the system.
+//
+// max_queue_size: Maximum number of items to enqueue before returning a
+//    Status::ServiceUnavailable message from Submit().
+//    Default: INT_MAX.
+//
+// idle_timeout: How long we'll keep around an idle thread before timing it 
out.
+//    We always keep at least min_threads.
+//    Default: 500 milliseconds.
+//
+class ThreadPoolBuilder {
+ public:
+  explicit ThreadPoolBuilder(std::string name);
+
+  // Note: We violate the style guide by returning mutable references here
+  // in order to provide traditional Builder pattern conveniences.
+  ThreadPoolBuilder& set_trace_metric_prefix(const std::string& prefix);
+  ThreadPoolBuilder& set_min_threads(int min_threads);
+  ThreadPoolBuilder& set_max_threads(int max_threads);
+  ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
+  ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
+
+  const std::string& name() const { return name_; }
+  int min_threads() const { return min_threads_; }
+  int max_threads() const { return max_threads_; }
+  int max_queue_size() const { return max_queue_size_; }
+  const MonoDelta& idle_timeout() const { return idle_timeout_; }
+
+  // Instantiate a new ThreadPool with the existing builder arguments.
+  Status Build(gscoped_ptr<ThreadPool>* pool) const;
+
+ private:
+  friend class ThreadPool;
+  const std::string name_;
+  std::string trace_metric_prefix_;
+  int min_threads_;
+  int max_threads_;
+  int max_queue_size_;
+  MonoDelta idle_timeout_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
+};
+
+// Thread pool with a variable number of threads.
+// The pool can execute a class that implements the Runnable interface, or a
+// boost::function, which can be obtained via boost::bind().
+//
+// Usage Example:
+//    static void Func(int n) { ... }
+//    class Task : public Runnable { ... }
+//
+//    gscoped_ptr<ThreadPool> thread_pool;
+//    CHECK_OK(
+//        ThreadPoolBuilder("my_pool")
+//            .set_min_threads(0)
+//            .set_max_threads(5)
+//            .set_max_queue_size(10)
+//            .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
+//            .Build(&thread_pool));
+//    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
+//    thread_pool->Submit(boost::bind(&Func, 10));
+class ThreadPool {
+ public:
+  ~ThreadPool();
+
+  // Wait for the running tasks to complete and then shutdown the threads.
+  // All the other pending tasks in the queue will be removed.
+  // NOTE: That the user may implement an external abort logic for the
+  //       runnables, that must be called before Shutdown(), if the system
+  //       should know about the non-execution of these tasks, or the runnable
+  //       require an explicit "abort" notification to exit from the run loop.
+  void Shutdown();
+
+  // Submit a function using the kudu Closure system.
+  Status SubmitClosure(const Closure& task) WARN_UNUSED_RESULT;
+
+  // Submit a function binded using boost::bind(&FuncName, args...)
+  Status SubmitFunc(boost::function<void()> func) WARN_UNUSED_RESULT;
+
+  // Submit a Runnable class
+  Status Submit(std::shared_ptr<Runnable> task) WARN_UNUSED_RESULT;
+
+  // Wait until all the tasks are completed.
+  void Wait();
+
+  // Waits for the pool to reach the idle state, or until 'until' time is 
reached.
+  // Returns true if the pool reached the idle state, false otherwise.
+  bool WaitUntil(const MonoTime& until);
+
+  // Waits for the pool to reach the idle state, or until 'delta' time elapses.
+  // Returns true if the pool reached the idle state, false otherwise.
+  bool WaitFor(const MonoDelta& delta);
+
+  // Return the current number of tasks waiting in the queue.
+  // Typically used for metrics.
+  int queue_length() const {
+    return ANNOTATE_UNPROTECTED_READ(queue_size_);
+  }
+
+  // Attach a histogram which measures the queue length seen by tasks when 
they enter
+  // the thread pool's queue.
+  void SetQueueLengthHistogram(const scoped_refptr<Histogram>& hist);
+
+  // Attach a histogram which measures the amount of time that tasks spend 
waiting in
+  // the queue.
+  void SetQueueTimeMicrosHistogram(const scoped_refptr<Histogram>& hist);
+
+  // Attach a histogram which measures the amount of time that tasks spend 
running.
+  void SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist);
+
+ private:
+  friend class ThreadPoolBuilder;
+
+  // Create a new thread pool using a builder.
+  explicit ThreadPool(const ThreadPoolBuilder& builder);
+
+  // Initialize the thread pool by starting the minimum number of threads.
+  Status Init();
+
+  // Dispatcher responsible for dequeueing and executing the tasks
+  void DispatchThread(bool permanent);
+
+  // Create new thread. Required that lock_ is held.
+  Status CreateThreadUnlocked();
+
+  // Aborts if the current thread is a member of this thread pool.
+  void CheckNotPoolThreadUnlocked();
+
+ private:
+  FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
+  FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
+
+  struct QueueEntry {
+    std::shared_ptr<Runnable> runnable;
+    Trace* trace;
+
+    // Time at which the entry was submitted to the pool.
+    MonoTime submit_time;
+  };
+
+  const std::string name_;
+  const int min_threads_;
+  const int max_threads_;
+  const int max_queue_size_;
+  const MonoDelta idle_timeout_;
+
+  Status pool_status_;
+  Mutex lock_;
+  ConditionVariable idle_cond_;
+  ConditionVariable no_threads_cond_;
+  ConditionVariable not_empty_;
+  int num_threads_;
+  int active_threads_;
+  int queue_size_;
+  std::list<QueueEntry> queue_;
+
+  // Pointers to all running threads. Raw pointers are safe because a Thread
+  // may only go out of scope after being removed from threads_.
+  //
+  // Protected by lock_.
+  std::unordered_set<Thread*> threads_;
+
+  scoped_refptr<Histogram> queue_length_histogram_;
+  scoped_refptr<Histogram> queue_time_us_histogram_;
+  scoped_refptr<Histogram> run_time_us_histogram_;
+
+  const char* queue_time_trace_metric_name_;
+  const char* run_wall_time_trace_metric_name_;
+  const char* run_cpu_time_trace_metric_name_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPool);
+};
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/throttler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/throttler-test.cc 
b/be/src/kudu/util/throttler-test.cc
new file mode 100644
index 0000000..57563d2
--- /dev/null
+++ b/be/src/kudu/util/throttler-test.cc
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/throttler.h"
+
+#include <algorithm>
+#include <cmath>
+#include <cstring>
+
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+class ThrottlerTest : public KuduTest {
+};
+
+TEST_F(ThrottlerTest, TestOpThrottle) {
+  // Check operation rate throttling
+  MonoTime now = MonoTime::Now();
+  Throttler t0(now, 1000, 1000*1000, 1);
+  // Fill up bucket
+  now += MonoDelta::FromMilliseconds(2000);
+  // Check throttle behavior for 1 second.
+  for (int p = 0; p < 10; p++) {
+    for (int i = 0; i < 100; i++) {
+      ASSERT_TRUE(t0.Take(now, 1, 1));
+    }
+    ASSERT_FALSE(t0.Take(now, 1, 1));
+    now += MonoDelta::FromMilliseconds(100);
+  }
+}
+
+TEST_F(ThrottlerTest, TestIOThrottle) {
+  // Check operation rate throttling
+  MonoTime now = MonoTime::Now();
+  Throttler t0(now, 50000, 1000*1000, 1);
+  // Fill up bucket
+  now += MonoDelta::FromMilliseconds(2000);
+  // Check throttle behavior for 1 second.
+  for (int p = 0; p < 10; p++) {
+    for (int i = 0; i < 100; i++) {
+      ASSERT_TRUE(t0.Take(now, 1, 1000));
+    }
+    ASSERT_FALSE(t0.Take(now, 1, 1000));
+    now += MonoDelta::FromMilliseconds(100);
+  }
+}
+
+TEST_F(ThrottlerTest, TestBurst) {
+  // Check IO rate throttling
+  MonoTime now = MonoTime::Now();
+  Throttler t0(now, 2000, 1000*1000, 5);
+  // Fill up bucket
+  now += MonoDelta::FromMilliseconds(2000);
+  for (int i = 0; i < 100; i++) {
+    now += MonoDelta::FromMilliseconds(1);
+    ASSERT_TRUE(t0.Take(now, 1, 5000));
+  }
+  ASSERT_TRUE(t0.Take(now, 1, 100000));
+  ASSERT_FALSE(t0.Take(now, 1, 1));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/throttler.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/throttler.cc b/be/src/kudu/util/throttler.cc
new file mode 100644
index 0000000..108d5db
--- /dev/null
+++ b/be/src/kudu/util/throttler.cc
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/throttler.h"
+
+#include <mutex>
+
+namespace kudu {
+
+Throttler::Throttler(MonoTime now, uint64_t op_rate, uint64_t byte_rate, 
double burst_factor) :
+    next_refill_(now) {
+  op_refill_ = op_rate / (MonoTime::kMicrosecondsPerSecond / 
kRefillPeriodMicros);
+  op_token_ = 0;
+  op_token_max_ = static_cast<uint64_t>(op_refill_ * burst_factor);
+  byte_refill_ = byte_rate / (MonoTime::kMicrosecondsPerSecond / 
kRefillPeriodMicros);
+  byte_token_ = 0;
+  byte_token_max_ = static_cast<uint64_t>(byte_refill_ * burst_factor);
+}
+
+bool Throttler::Take(MonoTime now, uint64_t op, uint64_t byte) {
+  if (op_refill_ == 0 && byte_refill_ == 0) {
+    return true;
+  }
+  std::lock_guard<simple_spinlock> lock(lock_);
+  Refill(now);
+  if ((op_refill_ == 0 || op <= op_token_) &&
+      (byte_refill_ == 0 || byte <= byte_token_)) {
+    if (op_refill_ > 0) {
+      op_token_ -= op;
+    }
+    if (byte_refill_ > 0) {
+      byte_token_ -= byte;
+    }
+    return true;
+  }
+  return false;
+}
+
+void Throttler::Refill(MonoTime now) {
+  int64_t d = (now - next_refill_).ToMicroseconds();
+  if (d < 0) {
+    return;
+  }
+  uint64_t num_period = d / kRefillPeriodMicros + 1;
+  next_refill_ += MonoDelta::FromMicroseconds(num_period * 
kRefillPeriodMicros);
+  op_token_ += num_period * op_refill_;
+  op_token_ = std::min(op_token_, op_token_max_);
+  byte_token_ += num_period * byte_refill_;
+  byte_token_ = std::min(byte_token_, byte_token_max_);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/throttler.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/throttler.h b/be/src/kudu/util/throttler.h
new file mode 100644
index 0000000..7be7ba1
--- /dev/null
+++ b/be/src/kudu/util/throttler.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_THROTTLER_H
+#define KUDU_UTIL_THROTTLER_H
+
+#include <algorithm>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+
+// A throttler to throttle both operation/s and IO byte/s.
+class Throttler {
+ public:
+  // Refill period is 100ms.
+  enum {
+    kRefillPeriodMicros = 100000
+  };
+
+  // Construct a throttler with max operation per second, max IO bytes per 
second
+  // and burst factor (burst_rate = rate * burst), burst rate means maximum
+  // throughput within one refill period.
+  // Set op_per_sec to 0 to disable operation throttling.
+  // Set byte_per_sec to 0 to disable IO bytes throttling.
+  Throttler(MonoTime now, uint64_t op_per_sec, uint64_t byte_per_sec, double 
burst_factor);
+
+  // Throttle an "operation group" by taking 'op' operation tokens and 'byte' 
byte tokens.
+  // Return true if there are enough tokens, and operation is allowed.
+  // Return false if there are not enough tokens, and operation is throttled.
+  bool Take(MonoTime now, uint64_t op, uint64_t byte);
+
+ private:
+  void Refill(MonoTime now);
+
+  MonoTime next_refill_;
+  uint64_t op_refill_;
+  uint64_t op_token_;
+  uint64_t op_token_max_;
+  uint64_t byte_refill_;
+  uint64_t byte_token_;
+  uint64_t byte_token_max_;
+  simple_spinlock lock_;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/trace-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/trace-test.cc b/be/src/kudu/util/trace-test.cc
new file mode 100644
index 0000000..6c38b0c
--- /dev/null
+++ b/be/src/kudu/util/trace-test.cc
@@ -0,0 +1,845 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <glog/stl_logging.h>
+#include <string>
+#include <rapidjson/document.h>
+#include <rapidjson/rapidjson.h>
+
+#include "kudu/util/trace.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/debug/trace_event_synthetic_delay.h"
+#include "kudu/util/debug/trace_logging.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+using kudu::debug::TraceLog;
+using kudu::debug::TraceResultBuffer;
+using kudu::debug::CategoryFilter;
+using rapidjson::Document;
+using rapidjson::Value;
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+class TraceTest : public KuduTest {
+};
+
+// Replace all digits in 's' with the character 'X'.
+static string XOutDigits(const string& s) {
+  string ret;
+  ret.reserve(s.size());
+  for (char c : s) {
+    if (isdigit(c)) {
+      ret.push_back('X');
+    } else {
+      ret.push_back(c);
+    }
+  }
+  return ret;
+}
+
+TEST_F(TraceTest, TestBasic) {
+  scoped_refptr<Trace> t(new Trace);
+  TRACE_TO(t, "hello $0, $1", "world", 12345);
+  TRACE_TO(t, "goodbye $0, $1", "cruel world", 54321);
+
+  string result = XOutDigits(t->DumpToString(Trace::NO_FLAGS));
+  ASSERT_EQ("XXXX XX:XX:XX.XXXXXX trace-test.cc:XX] hello world, XXXXX\n"
+            "XXXX XX:XX:XX.XXXXXX trace-test.cc:XX] goodbye cruel world, 
XXXXX\n",
+            result);
+}
+
+TEST_F(TraceTest, TestAttach) {
+  scoped_refptr<Trace> traceA(new Trace);
+  scoped_refptr<Trace> traceB(new Trace);
+  {
+    ADOPT_TRACE(traceA.get());
+    EXPECT_EQ(traceA.get(), Trace::CurrentTrace());
+    {
+      ADOPT_TRACE(traceB.get());
+      EXPECT_EQ(traceB.get(), Trace::CurrentTrace());
+      TRACE("hello from traceB");
+    }
+    EXPECT_EQ(traceA.get(), Trace::CurrentTrace());
+    TRACE("hello from traceA");
+  }
+  EXPECT_TRUE(Trace::CurrentTrace() == nullptr);
+  TRACE("this goes nowhere");
+
+  EXPECT_EQ(XOutDigits(traceA->DumpToString(Trace::NO_FLAGS)),
+            "XXXX XX:XX:XX.XXXXXX trace-test.cc:XX] hello from traceA\n");
+  EXPECT_EQ(XOutDigits(traceB->DumpToString(Trace::NO_FLAGS)),
+            "XXXX XX:XX:XX.XXXXXX trace-test.cc:XX] hello from traceB\n");
+}
+
+TEST_F(TraceTest, TestChildTrace) {
+  scoped_refptr<Trace> traceA(new Trace);
+  scoped_refptr<Trace> traceB(new Trace);
+  ADOPT_TRACE(traceA.get());
+  traceA->AddChildTrace("child", traceB.get());
+  TRACE("hello from traceA");
+  {
+    ADOPT_TRACE(traceB.get());
+    TRACE("hello from traceB");
+  }
+  EXPECT_EQ(XOutDigits(traceA->DumpToString(Trace::NO_FLAGS)),
+            "XXXX XX:XX:XX.XXXXXX trace-test.cc:XX] hello from traceA\n"
+            "Related trace 'child':\n"
+            "XXXX XX:XX:XX.XXXXXX trace-test.cc:XXX] hello from traceB\n");
+}
+
+static void GenerateTraceEvents(int thread_id,
+                                int num_events) {
+  for (int i = 0; i < num_events; i++) {
+    TRACE_EVENT1("test", "foo", "thread_id", thread_id);
+  }
+}
+
+// Parse the dumped trace data and return the number of events
+// found within, including only those with the "test" category.
+int ParseAndReturnEventCount(const string& trace_json) {
+  Document d;
+  d.Parse<0>(trace_json.c_str());
+  CHECK(d.IsObject()) << "bad json: " << trace_json;
+  const Value& events_json = d["traceEvents"];
+  CHECK(events_json.IsArray()) << "bad json: " << trace_json;
+
+  // Count how many of our events were seen. We have to filter out
+  // the metadata events.
+  int seen_real_events = 0;
+  for (int i = 0; i < events_json.Size(); i++) {
+    if (events_json[i]["cat"].GetString() == string("test")) {
+      seen_real_events++;
+    }
+  }
+
+  return seen_real_events;
+}
+
+TEST_F(TraceTest, TestChromeTracing) {
+  const int kNumThreads = 4;
+  const int kEventsPerThread = AllowSlowTests() ? 1000000 : 10000;
+
+  TraceLog* tl = TraceLog::GetInstance();
+  tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString),
+                 TraceLog::RECORDING_MODE,
+                 TraceLog::RECORD_CONTINUOUSLY);
+
+  vector<scoped_refptr<Thread> > threads(kNumThreads);
+
+  Stopwatch s;
+  s.start();
+  for (int i = 0; i < kNumThreads; i++) {
+    CHECK_OK(Thread::Create("test", "gen-traces", &GenerateTraceEvents, i, 
kEventsPerThread,
+                            &threads[i]));
+  }
+
+  for (int i = 0; i < kNumThreads; i++) {
+    threads[i]->Join();
+  }
+  tl->SetDisabled();
+
+  int total_events = kNumThreads * kEventsPerThread;
+  double elapsed = s.elapsed().wall_seconds();
+
+  LOG(INFO) << "Trace performance: " << static_cast<int>(total_events / 
elapsed) << " traces/sec";
+
+  string trace_json = TraceResultBuffer::FlushTraceLogToString();
+
+  // Verify that the JSON contains events. It won't have exactly
+  // kEventsPerThread * kNumThreads because the trace buffer isn't large enough
+  // for that.
+  ASSERT_GE(ParseAndReturnEventCount(trace_json), 100);
+}
+
+// Test that, if a thread exits before filling a full trace buffer, we still
+// see its results. This is a regression test for a bug in the earlier 
integration
+// of Chromium tracing into Kudu.
+TEST_F(TraceTest, TestTraceFromExitedThread) {
+  TraceLog* tl = TraceLog::GetInstance();
+  tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString),
+                 TraceLog::RECORDING_MODE,
+                 TraceLog::RECORD_CONTINUOUSLY);
+
+  // Generate 10 trace events in a separate thread.
+  int kNumEvents = 10;
+  scoped_refptr<Thread> t;
+  CHECK_OK(Thread::Create("test", "gen-traces", &GenerateTraceEvents, 1, 
kNumEvents,
+                          &t));
+  t->Join();
+  tl->SetDisabled();
+  string trace_json = TraceResultBuffer::FlushTraceLogToString();
+  LOG(INFO) << trace_json;
+
+  // Verify that the buffer contains 10 trace events
+  ASSERT_EQ(10, ParseAndReturnEventCount(trace_json));
+}
+
+static void GenerateWideSpan() {
+  TRACE_EVENT0("test", "GenerateWideSpan");
+  for (int i = 0; i < 1000; i++) {
+    TRACE_EVENT0("test", "InnerLoop");
+  }
+}
+
+// Test creating a trace event which contains many other trace events.
+// This ensures that we can go back and update a TraceEvent which fell in
+// a different trace chunk.
+TEST_F(TraceTest, TestWideSpan) {
+  TraceLog* tl = TraceLog::GetInstance();
+  tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString),
+                 TraceLog::RECORDING_MODE,
+                 TraceLog::RECORD_CONTINUOUSLY);
+
+  scoped_refptr<Thread> t;
+  CHECK_OK(Thread::Create("test", "gen-traces", &GenerateWideSpan, &t));
+  t->Join();
+  tl->SetDisabled();
+
+  string trace_json = TraceResultBuffer::FlushTraceLogToString();
+  ASSERT_EQ(1001, ParseAndReturnEventCount(trace_json));
+}
+
+// Regression test for KUDU-753: faulty JSON escaping when dealing with
+// single quote characters.
+TEST_F(TraceTest, TestJsonEncodingString) {
+  TraceLog* tl = TraceLog::GetInstance();
+  tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString),
+                 TraceLog::RECORDING_MODE,
+                 TraceLog::RECORD_CONTINUOUSLY);
+  {
+    TRACE_EVENT1("test", "test", "arg", "this is a test with \"'\"' and 
characters\nand new lines");
+  }
+  tl->SetDisabled();
+  string trace_json = TraceResultBuffer::FlushTraceLogToString();
+  ASSERT_EQ(1, ParseAndReturnEventCount(trace_json));
+}
+
+// Generate trace events continuously until 'latch' fires.
+// Increment *num_events_generated for each event generated.
+void GenerateTracesUntilLatch(AtomicInt<int64_t>* num_events_generated,
+                              CountDownLatch* latch) {
+  while (latch->count()) {
+    {
+      // This goes in its own scope so that the event is fully generated (with
+      // both its START and END times) before we do the counter increment 
below.
+      TRACE_EVENT0("test", "GenerateTracesUntilLatch");
+    }
+    num_events_generated->Increment();
+  }
+}
+
+// Test starting and stopping tracing while a thread is running.
+// This is a regression test for bugs in earlier versions of the imported
+// trace code.
+TEST_F(TraceTest, TestStartAndStopCollection) {
+  TraceLog* tl = TraceLog::GetInstance();
+
+  CountDownLatch latch(1);
+  AtomicInt<int64_t> num_events_generated(0);
+  scoped_refptr<Thread> t;
+  CHECK_OK(Thread::Create("test", "gen-traces", &GenerateTracesUntilLatch,
+                          &num_events_generated, &latch, &t));
+
+  const int num_flushes = AllowSlowTests() ? 50 : 3;
+  for (int i = 0; i < num_flushes; i++) {
+    
tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString),
+                   TraceLog::RECORDING_MODE,
+                   TraceLog::RECORD_CONTINUOUSLY);
+
+    const int64_t num_events_before = num_events_generated.Load();
+    SleepFor(MonoDelta::FromMilliseconds(10));
+    const int64_t num_events_after = num_events_generated.Load();
+    tl->SetDisabled();
+
+    string trace_json = TraceResultBuffer::FlushTraceLogToString();
+    // We might under-count the number of events, since we only measure the 
sleep,
+    // and tracing is enabled before and disabled after we start counting.
+    // We might also over-count by at most 1, because we could enable tracing
+    // right in between creating a trace event and incrementing the counter.
+    // But, we should never over-count by more than 1.
+    int expected_events_lowerbound = num_events_after - num_events_before - 1;
+    int captured_events = ParseAndReturnEventCount(trace_json);
+    ASSERT_GE(captured_events, expected_events_lowerbound);
+  }
+
+  latch.CountDown();
+  t->Join();
+}
+
+TEST_F(TraceTest, TestChromeSampling) {
+  TraceLog* tl = TraceLog::GetInstance();
+  tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString),
+                 TraceLog::RECORDING_MODE,
+                 static_cast<TraceLog::Options>(TraceLog::RECORD_CONTINUOUSLY |
+                                                TraceLog::ENABLE_SAMPLING));
+
+  for (int i = 0; i < 100; i++) {
+    switch (i % 3) {
+      case 0:
+        TRACE_EVENT_SET_SAMPLING_STATE("test", "state-0");
+        break;
+      case 1:
+        TRACE_EVENT_SET_SAMPLING_STATE("test", "state-1");
+        break;
+      case 2:
+        TRACE_EVENT_SET_SAMPLING_STATE("test", "state-2");
+        break;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(1));
+  }
+  tl->SetDisabled();
+  string trace_json = TraceResultBuffer::FlushTraceLogToString();
+  ASSERT_GT(ParseAndReturnEventCount(trace_json), 0);
+}
+
+class TraceEventCallbackTest : public KuduTest {
+ public:
+  virtual void SetUp() OVERRIDE {
+    KuduTest::SetUp();
+    ASSERT_EQ(nullptr, s_instance);
+    s_instance = this;
+  }
+  virtual void TearDown() OVERRIDE {
+    TraceLog::GetInstance()->SetDisabled();
+
+    // Flush the buffer so that one test doesn't end up leaving any
+    // extra results for the next test.
+    TraceResultBuffer::FlushTraceLogToString();
+
+    ASSERT_TRUE(!!s_instance);
+    s_instance = nullptr;
+    KuduTest::TearDown();
+
+  }
+
+ protected:
+  void EndTraceAndFlush() {
+    TraceLog::GetInstance()->SetDisabled();
+    string trace_json = TraceResultBuffer::FlushTraceLogToString();
+    trace_doc_.Parse<0>(trace_json.c_str());
+    LOG(INFO) << trace_json;
+    ASSERT_TRUE(trace_doc_.IsObject());
+    trace_parsed_ = trace_doc_["traceEvents"];
+    ASSERT_TRUE(trace_parsed_.IsArray());
+  }
+
+  void DropTracedMetadataRecords() {
+    // NB: rapidjson has move-semantics, like auto_ptr.
+    Value old_trace_parsed;
+    old_trace_parsed = trace_parsed_;
+    trace_parsed_.SetArray();
+    size_t old_trace_parsed_size = old_trace_parsed.Size();
+
+    for (size_t i = 0; i < old_trace_parsed_size; i++) {
+      Value value;
+      value = old_trace_parsed[i];
+      if (value.GetType() != rapidjson::kObjectType) {
+        trace_parsed_.PushBack(value, trace_doc_.GetAllocator());
+        continue;
+      }
+      string tmp;
+      if (value.HasMember("ph") && strcmp(value["ph"].GetString(), "M") == 0) {
+        continue;
+      }
+
+      trace_parsed_.PushBack(value, trace_doc_.GetAllocator());
+    }
+  }
+
+  // Search through the given array for any dictionary which has a key
+  // or value which has 'string_to_match' as a substring.
+  // Returns the matching dictionary, or NULL.
+  static const Value* FindTraceEntry(
+    const Value& trace_parsed,
+    const char* string_to_match) {
+    // Scan all items
+    size_t trace_parsed_count = trace_parsed.Size();
+    for (size_t i = 0; i < trace_parsed_count; i++) {
+      const Value& value = trace_parsed[i];
+      if (value.GetType() != rapidjson::kObjectType) {
+        continue;
+      }
+
+      for (Value::ConstMemberIterator it = value.MemberBegin();
+           it != value.MemberEnd();
+           ++it) {
+        if (it->name.IsString() && strstr(it->name.GetString(), 
string_to_match) != nullptr) {
+          return &value;
+        }
+        if (it->value.IsString() && strstr(it->value.GetString(), 
string_to_match) != nullptr) {
+          return &value;
+        }
+      }
+    }
+    return nullptr;
+  }
+
+  // For TraceEventCallbackAndRecordingX tests.
+  void VerifyCallbackAndRecordedEvents(size_t expected_callback_count,
+                                       size_t expected_recorded_count) {
+    // Callback events.
+    EXPECT_EQ(expected_callback_count, collected_events_names_.size());
+    for (size_t i = 0; i < collected_events_names_.size(); ++i) {
+      EXPECT_EQ("callback", collected_events_categories_[i]);
+      EXPECT_EQ("yes", collected_events_names_[i]);
+    }
+
+    // Recorded events.
+    EXPECT_EQ(expected_recorded_count, trace_parsed_.Size());
+    EXPECT_TRUE(FindTraceEntry(trace_parsed_, "recording"));
+    EXPECT_FALSE(FindTraceEntry(trace_parsed_, "callback"));
+    EXPECT_TRUE(FindTraceEntry(trace_parsed_, "yes"));
+    EXPECT_FALSE(FindTraceEntry(trace_parsed_, "no"));
+  }
+
+  void VerifyCollectedEvent(size_t i,
+                            unsigned phase,
+                            const string& category,
+                            const string& name) {
+    EXPECT_EQ(phase, collected_events_phases_[i]);
+    EXPECT_EQ(category, collected_events_categories_[i]);
+    EXPECT_EQ(name, collected_events_names_[i]);
+  }
+
+  Document trace_doc_;
+  Value trace_parsed_;
+
+  vector<string> collected_events_categories_;
+  vector<string> collected_events_names_;
+  vector<unsigned char> collected_events_phases_;
+  vector<MicrosecondsInt64> collected_events_timestamps_;
+
+  static TraceEventCallbackTest* s_instance;
+  static void Callback(MicrosecondsInt64 timestamp,
+                       char phase,
+                       const unsigned char* category_group_enabled,
+                       const char* name,
+                       uint64_t id,
+                       int num_args,
+                       const char* const arg_names[],
+                       const unsigned char arg_types[],
+                       const uint64_t arg_values[],
+                       unsigned char flags) {
+    s_instance->collected_events_phases_.push_back(phase);
+    s_instance->collected_events_categories_.push_back(
+        TraceLog::GetCategoryGroupName(category_group_enabled));
+    s_instance->collected_events_names_.push_back(name);
+    s_instance->collected_events_timestamps_.push_back(timestamp);
+  }
+};
+
+TraceEventCallbackTest* TraceEventCallbackTest::s_instance;
+
+TEST_F(TraceEventCallbackTest, TraceEventCallback) {
+  TRACE_EVENT_INSTANT0("all", "before enable", TRACE_EVENT_SCOPE_THREAD);
+  TraceLog::GetInstance()->SetEventCallbackEnabled(
+      CategoryFilter("*"), Callback);
+  TRACE_EVENT_INSTANT0("all", "event1", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("all", "event2", TRACE_EVENT_SCOPE_GLOBAL);
+  {
+    TRACE_EVENT0("all", "duration");
+    TRACE_EVENT_INSTANT0("all", "event3", TRACE_EVENT_SCOPE_GLOBAL);
+  }
+  TraceLog::GetInstance()->SetEventCallbackDisabled();
+  TRACE_EVENT_INSTANT0("all", "after callback removed",
+                       TRACE_EVENT_SCOPE_GLOBAL);
+  ASSERT_EQ(5u, collected_events_names_.size());
+  EXPECT_EQ("event1", collected_events_names_[0]);
+  EXPECT_EQ(TRACE_EVENT_PHASE_INSTANT, collected_events_phases_[0]);
+  EXPECT_EQ("event2", collected_events_names_[1]);
+  EXPECT_EQ(TRACE_EVENT_PHASE_INSTANT, collected_events_phases_[1]);
+  EXPECT_EQ("duration", collected_events_names_[2]);
+  EXPECT_EQ(TRACE_EVENT_PHASE_BEGIN, collected_events_phases_[2]);
+  EXPECT_EQ("event3", collected_events_names_[3]);
+  EXPECT_EQ(TRACE_EVENT_PHASE_INSTANT, collected_events_phases_[3]);
+  EXPECT_EQ("duration", collected_events_names_[4]);
+  EXPECT_EQ(TRACE_EVENT_PHASE_END, collected_events_phases_[4]);
+  for (size_t i = 1; i < collected_events_timestamps_.size(); i++) {
+    EXPECT_LE(collected_events_timestamps_[i - 1],
+              collected_events_timestamps_[i]);
+  }
+}
+
+TEST_F(TraceEventCallbackTest, TraceEventCallbackWhileFull) {
+  TraceLog::GetInstance()->SetEnabled(
+      CategoryFilter("*"),
+      TraceLog::RECORDING_MODE,
+      TraceLog::RECORD_UNTIL_FULL);
+  do {
+    TRACE_EVENT_INSTANT0("all", "badger badger", TRACE_EVENT_SCOPE_GLOBAL);
+  } while (!TraceLog::GetInstance()->BufferIsFull());
+  TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("*"),
+                                                   Callback);
+  TRACE_EVENT_INSTANT0("all", "a snake", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEventCallbackDisabled();
+  ASSERT_EQ(1u, collected_events_names_.size());
+  EXPECT_EQ("a snake", collected_events_names_[0]);
+}
+
+// 1: Enable callback, enable recording, disable callback, disable recording.
+TEST_F(TraceEventCallbackTest, TraceEventCallbackAndRecording1) {
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("callback"),
+                                                   Callback);
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEnabled(
+      CategoryFilter("recording"),
+      TraceLog::RECORDING_MODE,
+      TraceLog::RECORD_UNTIL_FULL);
+  TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEventCallbackDisabled();
+  TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  EndTraceAndFlush();
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+
+  DropTracedMetadataRecords();
+  ASSERT_NO_FATAL_FAILURE();
+  VerifyCallbackAndRecordedEvents(2, 2);
+}
+
+// 2: Enable callback, enable recording, disable recording, disable callback.
+TEST_F(TraceEventCallbackTest, TraceEventCallbackAndRecording2) {
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("callback"),
+                                                   Callback);
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEnabled(
+      CategoryFilter("recording"),
+      TraceLog::RECORDING_MODE,
+      TraceLog::RECORD_UNTIL_FULL);
+  TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  EndTraceAndFlush();
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEventCallbackDisabled();
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+
+  DropTracedMetadataRecords();
+  VerifyCallbackAndRecordedEvents(3, 1);
+}
+
+// 3: Enable recording, enable callback, disable callback, disable recording.
+TEST_F(TraceEventCallbackTest, TraceEventCallbackAndRecording3) {
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEnabled(
+      CategoryFilter("recording"),
+      TraceLog::RECORDING_MODE,
+      TraceLog::RECORD_UNTIL_FULL);
+  TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("callback"),
+                                                   Callback);
+  TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEventCallbackDisabled();
+  TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  EndTraceAndFlush();
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+
+  DropTracedMetadataRecords();
+  VerifyCallbackAndRecordedEvents(1, 3);
+}
+
+// 4: Enable recording, enable callback, disable recording, disable callback.
+TEST_F(TraceEventCallbackTest, TraceEventCallbackAndRecording4) {
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEnabled(
+      CategoryFilter("recording"),
+      TraceLog::RECORDING_MODE,
+      TraceLog::RECORD_UNTIL_FULL);
+  TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("callback"),
+                                                   Callback);
+  TRACE_EVENT_INSTANT0("recording", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  EndTraceAndFlush();
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "yes", TRACE_EVENT_SCOPE_GLOBAL);
+  TraceLog::GetInstance()->SetEventCallbackDisabled();
+  TRACE_EVENT_INSTANT0("recording", "no", TRACE_EVENT_SCOPE_GLOBAL);
+  TRACE_EVENT_INSTANT0("callback", "no", TRACE_EVENT_SCOPE_GLOBAL);
+
+  DropTracedMetadataRecords();
+  VerifyCallbackAndRecordedEvents(2, 2);
+}
+
+TEST_F(TraceEventCallbackTest, TraceEventCallbackAndRecordingDuration) {
+  TraceLog::GetInstance()->SetEventCallbackEnabled(CategoryFilter("*"),
+                                                   Callback);
+  {
+    TRACE_EVENT0("callback", "duration1");
+    TraceLog::GetInstance()->SetEnabled(
+        CategoryFilter("*"),
+        TraceLog::RECORDING_MODE,
+        TraceLog::RECORD_UNTIL_FULL);
+    TRACE_EVENT0("callback", "duration2");
+    EndTraceAndFlush();
+    TRACE_EVENT0("callback", "duration3");
+  }
+  TraceLog::GetInstance()->SetEventCallbackDisabled();
+
+  ASSERT_EQ(6u, collected_events_names_.size());
+  VerifyCollectedEvent(0, TRACE_EVENT_PHASE_BEGIN, "callback", "duration1");
+  VerifyCollectedEvent(1, TRACE_EVENT_PHASE_BEGIN, "callback", "duration2");
+  VerifyCollectedEvent(2, TRACE_EVENT_PHASE_BEGIN, "callback", "duration3");
+  VerifyCollectedEvent(3, TRACE_EVENT_PHASE_END, "callback", "duration3");
+  VerifyCollectedEvent(4, TRACE_EVENT_PHASE_END, "callback", "duration2");
+  VerifyCollectedEvent(5, TRACE_EVENT_PHASE_END, "callback", "duration1");
+}
+
+////////////////////////////////////////////////////////////
+// Tests for synthetic delay
+// (from chromium-base/debug/trace_event_synthetic_delay_unittest.cc)
+////////////////////////////////////////////////////////////
+
+namespace {
+
+const int kTargetDurationMs = 100;
+// Allow some leeway in timings to make it possible to run these tests with a
+// wall clock time source too.
+const int kShortDurationMs = 10;
+
+}  // namespace
+
+namespace debug {
+
+class TraceEventSyntheticDelayTest : public KuduTest,
+                                     public TraceEventSyntheticDelayClock {
+ public:
+  TraceEventSyntheticDelayTest() {
+    now_ = MonoTime::Min();
+  }
+
+  virtual ~TraceEventSyntheticDelayTest() {
+    ResetTraceEventSyntheticDelays();
+  }
+
+  // TraceEventSyntheticDelayClock implementation.
+  virtual MonoTime Now() OVERRIDE {
+    AdvanceTime(MonoDelta::FromMilliseconds(kShortDurationMs / 10));
+    return now_;
+  }
+
+  TraceEventSyntheticDelay* ConfigureDelay(const char* name) {
+    TraceEventSyntheticDelay* delay = TraceEventSyntheticDelay::Lookup(name);
+    delay->SetClock(this);
+    delay->SetTargetDuration(
+      MonoDelta::FromMilliseconds(kTargetDurationMs));
+    return delay;
+  }
+
+  void AdvanceTime(MonoDelta delta) { now_ += delta; }
+
+  int TestFunction() {
+    MonoTime start = Now();
+    { TRACE_EVENT_SYNTHETIC_DELAY("test.Delay"); }
+    MonoTime end = Now();
+    return (end - start).ToMilliseconds();
+  }
+
+  int AsyncTestFunctionBegin() {
+    MonoTime start = Now();
+    { TRACE_EVENT_SYNTHETIC_DELAY_BEGIN("test.AsyncDelay"); }
+    MonoTime end = Now();
+    return (end - start).ToMilliseconds();
+  }
+
+  int AsyncTestFunctionEnd() {
+    MonoTime start = Now();
+    { TRACE_EVENT_SYNTHETIC_DELAY_END("test.AsyncDelay"); }
+    MonoTime end = Now();
+    return (end - start).ToMilliseconds();
+  }
+
+ private:
+  MonoTime now_;
+
+  DISALLOW_COPY_AND_ASSIGN(TraceEventSyntheticDelayTest);
+};
+
+TEST_F(TraceEventSyntheticDelayTest, StaticDelay) {
+  TraceEventSyntheticDelay* delay = ConfigureDelay("test.Delay");
+  delay->SetMode(TraceEventSyntheticDelay::STATIC);
+  EXPECT_GE(TestFunction(), kTargetDurationMs);
+}
+
+TEST_F(TraceEventSyntheticDelayTest, OneShotDelay) {
+  TraceEventSyntheticDelay* delay = ConfigureDelay("test.Delay");
+  delay->SetMode(TraceEventSyntheticDelay::ONE_SHOT);
+  EXPECT_GE(TestFunction(), kTargetDurationMs);
+  EXPECT_LT(TestFunction(), kShortDurationMs);
+
+  delay->SetTargetDuration(
+      MonoDelta::FromMilliseconds(kTargetDurationMs));
+  EXPECT_GE(TestFunction(), kTargetDurationMs);
+}
+
+TEST_F(TraceEventSyntheticDelayTest, AlternatingDelay) {
+  TraceEventSyntheticDelay* delay = ConfigureDelay("test.Delay");
+  delay->SetMode(TraceEventSyntheticDelay::ALTERNATING);
+  EXPECT_GE(TestFunction(), kTargetDurationMs);
+  EXPECT_LT(TestFunction(), kShortDurationMs);
+  EXPECT_GE(TestFunction(), kTargetDurationMs);
+  EXPECT_LT(TestFunction(), kShortDurationMs);
+}
+
+TEST_F(TraceEventSyntheticDelayTest, AsyncDelay) {
+  ConfigureDelay("test.AsyncDelay");
+  EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs);
+  EXPECT_GE(AsyncTestFunctionEnd(), kTargetDurationMs / 2);
+}
+
+TEST_F(TraceEventSyntheticDelayTest, AsyncDelayExceeded) {
+  ConfigureDelay("test.AsyncDelay");
+  EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs);
+  AdvanceTime(MonoDelta::FromMilliseconds(kTargetDurationMs));
+  EXPECT_LT(AsyncTestFunctionEnd(), kShortDurationMs);
+}
+
+TEST_F(TraceEventSyntheticDelayTest, AsyncDelayNoActivation) {
+  ConfigureDelay("test.AsyncDelay");
+  EXPECT_LT(AsyncTestFunctionEnd(), kShortDurationMs);
+}
+
+TEST_F(TraceEventSyntheticDelayTest, AsyncDelayNested) {
+  ConfigureDelay("test.AsyncDelay");
+  EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs);
+  EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs);
+  EXPECT_LT(AsyncTestFunctionEnd(), kShortDurationMs);
+  EXPECT_GE(AsyncTestFunctionEnd(), kTargetDurationMs / 2);
+}
+
+TEST_F(TraceEventSyntheticDelayTest, AsyncDelayUnbalanced) {
+  ConfigureDelay("test.AsyncDelay");
+  EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs);
+  EXPECT_GE(AsyncTestFunctionEnd(), kTargetDurationMs / 2);
+  EXPECT_LT(AsyncTestFunctionEnd(), kShortDurationMs);
+
+  EXPECT_LT(AsyncTestFunctionBegin(), kShortDurationMs);
+  EXPECT_GE(AsyncTestFunctionEnd(), kTargetDurationMs / 2);
+}
+
+TEST_F(TraceEventSyntheticDelayTest, ResetDelays) {
+  ConfigureDelay("test.Delay");
+  ResetTraceEventSyntheticDelays();
+  EXPECT_LT(TestFunction(), kShortDurationMs);
+}
+
+TEST_F(TraceEventSyntheticDelayTest, BeginParallel) {
+  TraceEventSyntheticDelay* delay = ConfigureDelay("test.AsyncDelay");
+  MonoTime end_times[2];
+  MonoTime start_time = Now();
+
+  delay->BeginParallel(&end_times[0]);
+  EXPECT_FALSE(!end_times[0].Initialized());
+
+  delay->BeginParallel(&end_times[1]);
+  EXPECT_FALSE(!end_times[1].Initialized());
+
+  delay->EndParallel(end_times[0]);
+  EXPECT_GE((Now() - start_time).ToMilliseconds(), kTargetDurationMs);
+
+  start_time = Now();
+  delay->EndParallel(end_times[1]);
+  EXPECT_LT((Now() - start_time).ToMilliseconds(), kShortDurationMs);
+}
+
+TEST_F(TraceTest, TestVLogTrace) {
+  for (FLAGS_v = 0; FLAGS_v <= 1; FLAGS_v++) {
+    TraceLog* tl = TraceLog::GetInstance();
+    
tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString),
+                   TraceLog::RECORDING_MODE,
+                   TraceLog::RECORD_CONTINUOUSLY);
+    VLOG_AND_TRACE("test", 1) << "hello world";
+    tl->SetDisabled();
+    string trace_json = TraceResultBuffer::FlushTraceLogToString();
+    ASSERT_STR_CONTAINS(trace_json, "hello world");
+    ASSERT_STR_CONTAINS(trace_json, "trace-test.cc");
+  }
+}
+
+namespace {
+string FunctionWithSideEffect(bool* b) {
+  *b = true;
+  return "function-result";
+}
+} // anonymous namespace
+
+// Test that, if tracing is not enabled, a VLOG_AND_TRACE doesn't evaluate its
+// arguments.
+TEST_F(TraceTest, TestVLogTraceLazyEvaluation) {
+  FLAGS_v = 0;
+  bool function_run = false;
+  VLOG_AND_TRACE("test", 1) << FunctionWithSideEffect(&function_run);
+  ASSERT_FALSE(function_run);
+
+  // If we enable verbose logging, we should run the side effect even though
+  // trace logging is disabled.
+  FLAGS_v = 1;
+  VLOG_AND_TRACE("test", 1) << FunctionWithSideEffect(&function_run);
+  ASSERT_TRUE(function_run);
+}
+
+TEST_F(TraceTest, TestVLogAndEchoToConsole) {
+  TraceLog* tl = TraceLog::GetInstance();
+  tl->SetEnabled(CategoryFilter(CategoryFilter::kDefaultCategoryFilterString),
+                 TraceLog::RECORDING_MODE,
+                 TraceLog::ECHO_TO_CONSOLE);
+  FLAGS_v = 1;
+  VLOG_AND_TRACE("test", 1) << "hello world";
+  tl->SetDisabled();
+}
+
+TEST_F(TraceTest, TestTraceMetrics) {
+  scoped_refptr<Trace> trace(new Trace);
+  trace->metrics()->Increment("foo", 10);
+  trace->metrics()->Increment("bar", 10);
+  for (int i = 0; i < 1000; i++) {
+    trace->metrics()->Increment("baz", i);
+  }
+  EXPECT_EQ("{\"bar\":10,\"baz\":499500,\"foo\":10}",
+            trace->MetricsAsJSON());
+
+  {
+    ADOPT_TRACE(trace.get());
+    TRACE_COUNTER_SCOPE_LATENCY_US("test_scope_us");
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  }
+  auto m = trace->metrics()->Get();
+  EXPECT_GE(m["test_scope_us"], 80 * 1000);
+}
+
+} // namespace debug
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/trace.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/trace.cc b/be/src/kudu/util/trace.cc
new file mode 100644
index 0000000..698c915
--- /dev/null
+++ b/be/src/kudu/util/trace.cc
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/trace.h"
+
+#include <iomanip>
+#include <ios>
+#include <iostream>
+#include <map>
+#include <mutex>
+#include <string>
+#include <sstream>
+#include <utility>
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/memory/arena.h"
+#include "kudu/util/jsonwriter.h"
+
+namespace kudu {
+
+using strings::internal::SubstituteArg;
+
+__thread Trace* Trace::threadlocal_trace_;
+
+Trace::Trace()
+  : arena_(new ThreadSafeArena(1024, 128*1024)),
+    entries_head_(nullptr),
+    entries_tail_(nullptr) {
+}
+
+Trace::~Trace() {
+}
+
+// Struct which precedes each entry in the trace.
+struct TraceEntry {
+  MicrosecondsInt64 timestamp_micros;
+
+  // The source file and line number which generated the trace message.
+  const char* file_path;
+  int line_number;
+
+  uint32_t message_len;
+  TraceEntry* next;
+
+  // The actual trace message follows the entry header.
+  char* message() {
+    return reinterpret_cast<char*>(this) + sizeof(*this);
+  }
+};
+
+// Get the part of filepath after the last path separator.
+// (Doesn't modify filepath, contrary to basename() in libgen.h.)
+// Borrowed from glog.
+static const char* const_basename(const char* filepath) {
+  const char* base = strrchr(filepath, '/');
+#ifdef OS_WINDOWS  // Look for either path separator in Windows
+  if (!base)
+    base = strrchr(filepath, '\\');
+#endif
+  return base ? (base+1) : filepath;
+}
+
+
+void Trace::SubstituteAndTrace(const char* file_path,
+                               int line_number,
+                               StringPiece format,
+                               const SubstituteArg& arg0, const SubstituteArg& 
arg1,
+                               const SubstituteArg& arg2, const SubstituteArg& 
arg3,
+                               const SubstituteArg& arg4, const SubstituteArg& 
arg5,
+                               const SubstituteArg& arg6, const SubstituteArg& 
arg7,
+                               const SubstituteArg& arg8, const SubstituteArg& 
arg9) {
+  const SubstituteArg* const args_array[] = {
+    &arg0, &arg1, &arg2, &arg3, &arg4, &arg5, &arg6, &arg7, &arg8, &arg9, 
nullptr
+  };
+
+  int msg_len = strings::internal::SubstitutedSize(format, args_array);
+  TraceEntry* entry = NewEntry(msg_len, file_path, line_number);
+  SubstituteToBuffer(format, args_array, entry->message());
+  AddEntry(entry);
+}
+
+TraceEntry* Trace::NewEntry(int msg_len, const char* file_path, int 
line_number) {
+  int size = sizeof(TraceEntry) + msg_len;
+  uint8_t* dst = reinterpret_cast<uint8_t*>(arena_->AllocateBytes(size));
+  TraceEntry* entry = reinterpret_cast<TraceEntry*>(dst);
+  entry->timestamp_micros = GetCurrentTimeMicros();
+  entry->message_len = msg_len;
+  entry->file_path = file_path;
+  entry->line_number = line_number;
+  return entry;
+}
+
+void Trace::AddEntry(TraceEntry* entry) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  entry->next = nullptr;
+
+  if (entries_tail_ != nullptr) {
+    entries_tail_->next = entry;
+  } else {
+    DCHECK(entries_head_ == nullptr);
+    entries_head_ = entry;
+  }
+  entries_tail_ = entry;
+}
+
+void Trace::Dump(std::ostream* out, int flags) const {
+  // Gather a copy of the list of entries under the lock. This is fast
+  // enough that we aren't worried about stalling concurrent tracers
+  // (whereas doing the logging itself while holding the lock might be
+  // too slow, if the output stream is a file, for example).
+  vector<TraceEntry*> entries;
+  vector<pair<StringPiece, scoped_refptr<Trace>>> child_traces;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (TraceEntry* cur = entries_head_;
+         cur != nullptr;
+         cur = cur->next) {
+      entries.push_back(cur);
+    }
+
+    child_traces = child_traces_;
+  }
+
+  // Save original flags.
+  std::ios::fmtflags save_flags(out->flags());
+
+  int64_t prev_usecs = 0;
+  for (TraceEntry* e : entries) {
+    // Log format borrowed from glog/logging.cc
+    time_t secs_since_epoch = e->timestamp_micros / 1000000;
+    int usecs = e->timestamp_micros % 1000000;
+    struct tm tm_time;
+    localtime_r(&secs_since_epoch, &tm_time);
+
+    int64_t usecs_since_prev = 0;
+    if (prev_usecs != 0) {
+      usecs_since_prev = e->timestamp_micros - prev_usecs;
+    }
+    prev_usecs = e->timestamp_micros;
+
+    using std::setw;
+    out->fill('0');
+
+    *out << setw(2) << (1 + tm_time.tm_mon)
+         << setw(2) << tm_time.tm_mday
+         << ' '
+         << setw(2) << tm_time.tm_hour  << ':'
+         << setw(2) << tm_time.tm_min   << ':'
+         << setw(2) << tm_time.tm_sec   << '.'
+         << setw(6) << usecs << ' ';
+    if (flags & INCLUDE_TIME_DELTAS) {
+      out->fill(' ');
+      *out << "(+" << setw(6) << usecs_since_prev << "us) ";
+    }
+    *out << const_basename(e->file_path) << ':' << e->line_number
+         << "] ";
+    out->write(reinterpret_cast<char*>(e) + sizeof(TraceEntry),
+               e->message_len);
+    *out << std::endl;
+  }
+
+  for (const auto& entry : child_traces) {
+    const auto& t = entry.second;
+    *out << "Related trace '" << entry.first << "':" << std::endl;
+    *out << t->DumpToString(flags & (~INCLUDE_METRICS));
+  }
+
+  if (flags & INCLUDE_METRICS) {
+    *out << "Metrics: " << MetricsAsJSON();
+  }
+
+  // Restore stream flags.
+  out->flags(save_flags);
+}
+
+string Trace::DumpToString(int flags) const {
+  std::ostringstream s;
+  Dump(&s, flags);
+  return s.str();
+}
+
+string Trace::MetricsAsJSON() const {
+  std::ostringstream s;
+  JsonWriter jw(&s, JsonWriter::COMPACT);
+  MetricsToJSON(&jw);
+  return s.str();
+}
+
+void Trace::MetricsToJSON(JsonWriter* jw) const {
+  // Convert into a map with 'std::string' keys instead of 'const char*'
+  // keys, so that the results are in a consistent (sorted) order.
+  std::map<string, int64_t> counters;
+  for (const auto& entry : metrics_.Get()) {
+    counters[entry.first] = entry.second;
+  }
+
+  jw->StartObject();
+  for (const auto& e : counters) {
+    jw->String(e.first);
+    jw->Int64(e.second);
+  }
+  vector<pair<StringPiece, scoped_refptr<Trace>>> child_traces;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    child_traces = child_traces_;
+  }
+
+  if (!child_traces.empty()) {
+    jw->String("child_traces");
+    jw->StartArray();
+
+    for (const auto& e : child_traces) {
+      jw->StartArray();
+      jw->String(e.first.data(), e.first.size());
+      e.second->MetricsToJSON(jw);
+      jw->EndArray();
+    }
+    jw->EndArray();
+  }
+  jw->EndObject();
+}
+
+void Trace::DumpCurrentTrace() {
+  Trace* t = CurrentTrace();
+  if (t == nullptr) {
+    LOG(INFO) << "No trace is currently active.";
+    return;
+  }
+  t->Dump(&std::cerr, true);
+}
+
+void Trace::AddChildTrace(StringPiece label, Trace* child_trace) {
+  CHECK(arena_->RelocateStringPiece(label, &label));
+
+  std::lock_guard<simple_spinlock> l(lock_);
+  scoped_refptr<Trace> ptr(child_trace);
+  child_traces_.emplace_back(label, ptr);
+}
+
+std::vector<std::pair<StringPiece, scoped_refptr<Trace>>> Trace::ChildTraces() 
const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return child_traces_;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/trace.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/trace.h b/be/src/kudu/util/trace.h
new file mode 100644
index 0000000..4d6df24
--- /dev/null
+++ b/be/src/kudu/util/trace.h
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_TRACE_H
+#define KUDU_UTIL_TRACE_H
+
+#include <iosfwd>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/threading/thread_collision_warner.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/trace_metrics.h"
+
+// Adopt a Trace on the current thread for the duration of the current
+// scope. The old current Trace is restored when the scope is exited.
+//
+// 't' should be a Trace* pointer.
+#define ADOPT_TRACE(t) kudu::ScopedAdoptTrace _adopt_trace(t);
+
+// Issue a trace message, if tracing is enabled in the current thread.
+// See Trace::SubstituteAndTrace for arguments.
+// Example:
+//  TRACE("Acquired timestamp $0", timestamp);
+#define TRACE(format, substitutions...) \
+  do { \
+    kudu::Trace* _trace = Trace::CurrentTrace(); \
+    if (_trace) { \
+      _trace->SubstituteAndTrace(__FILE__, __LINE__, (format),  \
+        ##substitutions); \
+    } \
+  } while (0);
+
+// Like the above, but takes the trace pointer as an explicit argument.
+#define TRACE_TO(trace, format, substitutions...) \
+  (trace)->SubstituteAndTrace(__FILE__, __LINE__, (format), ##substitutions)
+
+// Increment a counter associated with the current trace.
+//
+// Each trace contains a map of counters which can be used to keep
+// request-specific statistics. It is significantly faster to increment
+// a trace counter compared to logging a message. Additionally, having
+// slightly more structured information makes it easier to aggregate
+// and show information back to operators.
+//
+// NOTE: the 'counter_name' MUST be a string which stays alive forever.
+// Typically, this is a compile-time constant. If something other than
+// a constant is required, use TraceMetric::InternName() in order to
+// create a string which will last for the process lifetime. Of course,
+// these strings will never be cleaned up, so it's important to use this
+// judiciously.
+//
+// If no trace is active, this does nothing and does not evaluate its
+// parameters.
+#define TRACE_COUNTER_INCREMENT(counter_name, val) \
+  do { \
+    kudu::Trace* _trace = Trace::CurrentTrace(); \
+    if (_trace) { \
+      _trace->metrics()->Increment(counter_name, val); \
+    } \
+  } while (0);
+
+// Increment a counter for the amount of wall time spent in the current
+// scope. For example:
+//
+//  void DoFoo() {
+//    TRACE_COUNTER_SCOPE_LATENCY_US("foo_us");
+//    ... do expensive Foo thing
+//  }
+//
+//  will result in a trace metric indicating the number of microseconds spent
+//  in invocations of DoFoo().
+#define TRACE_COUNTER_SCOPE_LATENCY_US(counter_name) \
+  ::kudu::ScopedTraceLatencyCounter _scoped_latency(counter_name)
+
+// Construct a constant C string counter name which acts as a sort of
+// coarse-grained histogram for trace metrics.
+#define BUCKETED_COUNTER_NAME(prefix, duration_us)      \
+  [=]() -> const char* {                                \
+    if (duration_us >= 100 * 1000) {                    \
+      return prefix "_gt_100_ms";                       \
+    } else if (duration_us >= 10 * 1000) {              \
+      return prefix "_10-100_ms";                       \
+    } else if (duration_us >= 1000) {                   \
+      return prefix "_1-10_ms";                         \
+    } else {                                            \
+      return prefix "_lt_1ms";                          \
+    }                                                   \
+  }();
+
+namespace kudu {
+
+class JsonWriter;
+class ThreadSafeArena;
+struct TraceEntry;
+
+// A trace for a request or other process. This supports collecting trace 
entries
+// from a number of threads, and later dumping the results to a stream.
+//
+// Callers should generally not add trace messages directly using the public
+// methods of this class. Rather, the TRACE(...) macros defined above should
+// be used such that file/line numbers are automatically included, etc.
+//
+// This class is thread-safe.
+class Trace : public RefCountedThreadSafe<Trace> {
+ public:
+  Trace();
+
+  // Logs a message into the trace buffer.
+  //
+  // See strings::Substitute for details.
+  //
+  // N.B.: the file path passed here is not copied, so should be a static
+  // constant (eg __FILE__).
+  void SubstituteAndTrace(const char* filepath, int line_number,
+                          StringPiece format,
+                          const strings::internal::SubstituteArg& arg0 =
+                            strings::internal::SubstituteArg::NoArg,
+                          const strings::internal::SubstituteArg& arg1 =
+                            strings::internal::SubstituteArg::NoArg,
+                          const strings::internal::SubstituteArg& arg2 =
+                            strings::internal::SubstituteArg::NoArg,
+                          const strings::internal::SubstituteArg& arg3 =
+                            strings::internal::SubstituteArg::NoArg,
+                          const strings::internal::SubstituteArg& arg4 =
+                            strings::internal::SubstituteArg::NoArg,
+                          const strings::internal::SubstituteArg& arg5 =
+                            strings::internal::SubstituteArg::NoArg,
+                          const strings::internal::SubstituteArg& arg6 =
+                            strings::internal::SubstituteArg::NoArg,
+                          const strings::internal::SubstituteArg& arg7 =
+                            strings::internal::SubstituteArg::NoArg,
+                          const strings::internal::SubstituteArg& arg8 =
+                            strings::internal::SubstituteArg::NoArg,
+                          const strings::internal::SubstituteArg& arg9 =
+                            strings::internal::SubstituteArg::NoArg);
+
+  // Dump the trace buffer to the given output stream.
+  //
+  enum {
+    NO_FLAGS = 0,
+
+    // If set, calculate and print the difference between successive trace 
messages.
+    INCLUDE_TIME_DELTAS = 1 << 0,
+    // If set, include a 'Metrics' line showing any attached trace metrics.
+    INCLUDE_METRICS =     1 << 1,
+
+    INCLUDE_ALL = INCLUDE_TIME_DELTAS | INCLUDE_METRICS
+  };
+  void Dump(std::ostream* out, int flags) const;
+
+  // Dump the trace buffer as a string.
+  std::string DumpToString(int flags = INCLUDE_ALL) const;
+
+  std::string MetricsAsJSON() const;
+
+  // Attaches the given trace which will get appended at the end when Dumping.
+  //
+  // The 'label' does not necessarily have to be unique, and is used to 
identify
+  // the child trace when dumped. The contents of the StringPiece are copied
+  // into this trace's arena.
+  void AddChildTrace(StringPiece label, Trace* child_trace);
+
+  // Return a copy of the current set of related "child" traces.
+  std::vector<std::pair<StringPiece, scoped_refptr<Trace>>> ChildTraces() 
const;
+
+  // Return the current trace attached to this thread, if there is one.
+  static Trace* CurrentTrace() {
+    return threadlocal_trace_;
+  }
+
+  // Simple function to dump the current trace to stderr, if one is
+  // available. This is meant for usage when debugging in gdb via
+  // 'call kudu::Trace::DumpCurrentTrace();'.
+  static void DumpCurrentTrace();
+
+  TraceMetrics* metrics() {
+    return &metrics_;
+  }
+  const TraceMetrics& metrics() const {
+    return metrics_;
+  }
+
+ private:
+  friend class ScopedAdoptTrace;
+  friend class RefCountedThreadSafe<Trace>;
+  ~Trace();
+
+  // The current trace for this thread. Threads should only set this using
+  // using ScopedAdoptTrace, which handles reference counting the underlying
+  // object.
+  static __thread Trace* threadlocal_trace_;
+
+  // Allocate a new entry from the arena, with enough space to hold a
+  // message of length 'len'.
+  TraceEntry* NewEntry(int len, const char* file_path, int line_number);
+
+  // Add the entry to the linked list of entries.
+  void AddEntry(TraceEntry* entry);
+
+  void MetricsToJSON(JsonWriter* jw) const;
+
+  gscoped_ptr<ThreadSafeArena> arena_;
+
+  // Lock protecting the entries linked list.
+  mutable simple_spinlock lock_;
+  // The head of the linked list of entries (allocated inside arena_)
+  TraceEntry* entries_head_;
+  // The tail of the linked list of entries (allocated inside arena_)
+  TraceEntry* entries_tail_;
+
+  std::vector<std::pair<StringPiece, scoped_refptr<Trace>>> child_traces_;
+
+  TraceMetrics metrics_;
+
+  DISALLOW_COPY_AND_ASSIGN(Trace);
+};
+
+// Adopt a Trace object into the current thread for the duration
+// of this object.
+// This should only be used on the stack (and thus created and destroyed
+// on the same thread)
+class ScopedAdoptTrace {
+ public:
+  explicit ScopedAdoptTrace(Trace* t) :
+    old_trace_(Trace::threadlocal_trace_) {
+    Trace::threadlocal_trace_ = t;
+    if (t) {
+      t->AddRef();
+    }
+    DFAKE_SCOPED_LOCK_THREAD_LOCKED(ctor_dtor_);
+  }
+
+  ~ScopedAdoptTrace() {
+    auto t = Trace::threadlocal_trace_;
+    Trace::threadlocal_trace_ = old_trace_;
+
+    // It's critical that we Release() the reference count on 't' only
+    // after we've unset the thread-local variable. Otherwise, we can hit
+    // a nasty interaction with tcmalloc contention profiling. Consider
+    // the following sequence:
+    //
+    //   1. threadlocal_trace_ has refcount = 1
+    //   2. we call threadlocal_trace_->Release() which decrements refcount to 0
+    //   3. this calls 'delete' on the Trace object
+    //   3a. this calls tcmalloc free() on the Trace and various sub-objects
+    //   3b. the free() calls may end up experiencing contention in tcmalloc
+    //   3c. we try to account the contention in threadlocal_trace_'s 
TraceMetrics,
+    //       but it has already been freed.
+    //
+    // In the best case, we just scribble into some free tcmalloc memory. In 
the
+    // worst case, tcmalloc would have already re-used this memory for a new
+    // allocation on another thread, and we end up overwriting someone else's 
memory.
+    //
+    // Waiting to Release() only after 'unpublishing' the trace solves this.
+    if (t) {
+      t->Release();
+    }
+    DFAKE_SCOPED_LOCK_THREAD_LOCKED(ctor_dtor_);
+  }
+
+ private:
+  DFAKE_MUTEX(ctor_dtor_);
+  Trace* old_trace_;
+
+  DISALLOW_COPY_AND_ASSIGN(ScopedAdoptTrace);
+};
+
+// Implementation for TRACE_COUNTER_SCOPE_LATENCY_US(...) macro above.
+class ScopedTraceLatencyCounter {
+ public:
+  explicit ScopedTraceLatencyCounter(const char* counter)
+      : counter_(counter),
+        start_time_(GetCurrentTimeMicros()) {
+  }
+
+  ~ScopedTraceLatencyCounter() {
+    TRACE_COUNTER_INCREMENT(counter_, GetCurrentTimeMicros() - start_time_);
+  }
+
+ private:
+  const char* const counter_;
+  MicrosecondsInt64 start_time_;
+  DISALLOW_COPY_AND_ASSIGN(ScopedTraceLatencyCounter);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_TRACE_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/trace_metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/trace_metrics.cc 
b/be/src/kudu/util/trace_metrics.cc
new file mode 100644
index 0000000..ba4ad97
--- /dev/null
+++ b/be/src/kudu/util/trace_metrics.cc
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/trace_metrics.h"
+
+#include <algorithm>
+#include <ctype.h>
+#include <glog/stl_logging.h>
+#include <map>
+#include <mutex>
+#include <string>
+
+#include "kudu/util/debug/leakcheck_disabler.h"
+
+using std::string;
+
+namespace kudu {
+
+// Make glog's STL-compatible operators visible inside this namespace.
+using ::operator<<;
+
+namespace {
+
+static simple_spinlock g_intern_map_lock;
+typedef std::map<string, const char*> InternMap;
+static InternMap* g_intern_map;
+
+} // anonymous namespace
+
+const char* TraceMetrics::InternName(const string& name) {
+  DCHECK(std::all_of(name.begin(), name.end(), [] (char c) { return 
isprint(c); } ))
+      << "not printable: " << name;
+
+  debug::ScopedLeakCheckDisabler no_leakcheck;
+  std::lock_guard<simple_spinlock> l(g_intern_map_lock);
+  if (g_intern_map == nullptr) {
+    g_intern_map = new InternMap();
+  }
+
+  InternMap::iterator it = g_intern_map->find(name);
+  if (it != g_intern_map->end()) {
+    return it->second;
+  }
+
+  const char* dup = strdup(name.c_str());
+  (*g_intern_map)[name] = dup;
+
+  // We don't expect this map to grow large.
+  DCHECK_LT(g_intern_map->size(), 100) <<
+      "Too many interned strings: " << *g_intern_map;
+
+  return dup;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/trace_metrics.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/trace_metrics.h b/be/src/kudu/util/trace_metrics.h
new file mode 100644
index 0000000..a230821
--- /dev/null
+++ b/be/src/kudu/util/trace_metrics.h
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/locks.h"
+
+#include <map>
+#include <mutex>
+#include <string>
+
+namespace kudu {
+
+// A simple map of constant string names to integer counters.
+//
+// Typically, the TRACE_COUNTER_INCREMENT(...) macro defined in
+// trace.h is used to increment a counter within this map.
+//
+// This currently is just a thin wrapper around a spinlocked map,
+// but if it becomes noticeable in the CPU profile, various optimizations
+// are plausible.
+class TraceMetrics {
+ public:
+  TraceMetrics() : tcmalloc_contention_cycles_(0) {}
+  ~TraceMetrics() {}
+
+  // Internalize the given string by duplicating it into a process-wide
+  // pool. If this string has already been interned, returns a pointer
+  // to a previous instance. Otherwise, copies it into the pool.
+  //
+  // The resulting strings are purposefully leaked, so this should only
+  // be used in cases where the number of unique strings that will be
+  // passed is relatively low (i.e. not user-specified).
+  //
+  // Because 'name' is exposed back to operators, it must be a printable
+  // ASCII string.
+  static const char* InternName(const std::string& name);
+
+  // Increment the given counter.
+  void Increment(const char* name, int64_t amount);
+
+  // Return a copy of the current counter map.
+  std::map<const char*, int64_t> Get() const;
+
+  // Increment the number of cycles spent in tcmalloc lock contention.
+  //
+  // tcmalloc contention is stored separately from other metrics since
+  // it is incremented from a code path that prohibits allocation.
+  void IncrementTcmallocContentionCycles(int64_t cycles) {
+    tcmalloc_contention_cycles_.IncrementBy(cycles);
+  }
+
+  // Return metric's current value.
+  //
+  // NOTE: the 'name' MUST be the same const char* which is used for
+  // insertion. This is because we do pointer-wise comparison internally.
+  int64_t GetMetric(const char* name) const;
+
+ private:
+  mutable simple_spinlock lock_;
+  std::map<const char*, int64_t> counters_;
+  AtomicInt<int64_t> tcmalloc_contention_cycles_;
+
+  DISALLOW_COPY_AND_ASSIGN(TraceMetrics);
+};
+
+inline void TraceMetrics::Increment(const char* name, int64_t amount) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  counters_[name] += amount;
+}
+
+inline std::map<const char*, int64_t> TraceMetrics::Get() const {
+  std::unique_lock<simple_spinlock> l(lock_);
+  auto m = counters_;
+  l.unlock();
+
+  auto v = tcmalloc_contention_cycles_.Load();
+  if (v > 0) {
+    m["tcmalloc_contention_cycles"] = v;
+  }
+  return m;
+}
+
+inline int64_t TraceMetrics::GetMetric(const char* name) const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return FindWithDefault(counters_, name, 0);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/url-coding-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/url-coding-test.cc 
b/be/src/kudu/util/url-coding-test.cc
new file mode 100644
index 0000000..e83de64
--- /dev/null
+++ b/be/src/kudu/util/url-coding-test.cc
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <stdlib.h>
+#include <stdio.h>
+#include <iostream>
+#include <gtest/gtest.h>
+#include "kudu/util/url-coding.h"
+
+using namespace std; // NOLINT(*)
+
+namespace kudu {
+
+// Tests encoding/decoding of input.  If expected_encoded is non-empty, the
+// encoded string is validated against it.
+void TestUrl(const string& input, const string& expected_encoded, bool 
hive_compat) {
+  string intermediate;
+  UrlEncode(input, &intermediate, hive_compat);
+  string output;
+  if (!expected_encoded.empty()) {
+    EXPECT_EQ(expected_encoded, intermediate);
+  }
+  EXPECT_TRUE(UrlDecode(intermediate, &output, hive_compat));
+  EXPECT_EQ(input, output);
+
+  // Convert string to vector and try that also
+  vector<uint8_t> input_vector;
+  input_vector.resize(input.size());
+  if (!input.empty()) {
+    memcpy(&input_vector[0], input.c_str(), input.size());
+  }
+  string intermediate2;
+  UrlEncode(input_vector, &intermediate2, hive_compat);
+  EXPECT_EQ(intermediate, intermediate2);
+}
+
+void TestBase64(const string& input, const string& expected_encoded) {
+  string intermediate;
+  Base64Encode(input, &intermediate);
+  string output;
+  if (!expected_encoded.empty()) {
+    EXPECT_EQ(intermediate, expected_encoded);
+  }
+  EXPECT_TRUE(Base64Decode(intermediate, &output));
+  EXPECT_EQ(input, output);
+
+  // Convert string to vector and try that also
+  vector<uint8_t> input_vector;
+  input_vector.resize(input.size());
+  memcpy(&input_vector[0], input.c_str(), input.size());
+  string intermediate2;
+  Base64Encode(input_vector, &intermediate2);
+  EXPECT_EQ(intermediate, intermediate2);
+}
+
+// Test URL encoding. Check that the values that are put in are the
+// same that come out.
+TEST(UrlCodingTest, Basic) {
+  string input = 
"ABCDEFGHIJKLMNOPQRSTUWXYZ1234567890~!@#$%^&*()<>?,./:\";'{}|[]\\_+-=";
+  TestUrl(input, "", false);
+  TestUrl(input, "", true);
+}
+
+TEST(UrlCodingTest, HiveExceptions) {
+  TestUrl(" +", " +", true);
+}
+
+TEST(UrlCodingTest, BlankString) {
+  TestUrl("", "", false);
+  TestUrl("", "", true);
+}
+
+TEST(UrlCodingTest, PathSeparators) {
+  TestUrl("/home/impala/directory/", "%2Fhome%2Fimpala%2Fdirectory%2F", false);
+  TestUrl("/home/impala/directory/", "%2Fhome%2Fimpala%2Fdirectory%2F", true);
+}
+
+TEST(Base64Test, Basic) {
+  TestBase64("a", "YQ==");
+  TestBase64("ab", "YWI=");
+  TestBase64("abc", "YWJj");
+  TestBase64("abcd", "YWJjZA==");
+  TestBase64("abcde", "YWJjZGU=");
+  TestBase64("abcdef", "YWJjZGVm");
+}
+
+TEST(HtmlEscapingTest, Basic) {
+  string before = "<html><body>&amp";
+  ostringstream after;
+  EscapeForHtml(before, &after);
+  EXPECT_EQ(after.str(), "&lt;html&gt;&lt;body&gt;&amp;amp");
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/url-coding.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/url-coding.cc b/be/src/kudu/util/url-coding.cc
new file mode 100644
index 0000000..afbaa44
--- /dev/null
+++ b/be/src/kudu/util/url-coding.cc
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "kudu/util/url-coding.h"
+
+#include <algorithm>
+#include <exception>
+#include <sstream>
+
+#include <boost/algorithm/string.hpp>
+#include <boost/archive/iterators/base64_from_binary.hpp>
+#include <boost/archive/iterators/binary_from_base64.hpp>
+#include <boost/archive/iterators/transform_width.hpp>
+#include <glog/logging.h>
+
+using std::string;
+using std::vector;
+using namespace boost::archive::iterators; // NOLINT(*)
+
+namespace kudu {
+
+// Hive selectively encodes characters. This is the whitelist of
+// characters it will encode.
+// See common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+// in the Hive source code for the source of this list.
+static boost::function<bool (char)> HiveShouldEscape = 
boost::is_any_of("\"#%\\*/:=?\u00FF"); // NOLINT(*)
+
+// It is more convenient to maintain the complement of the set of
+// characters to escape when not in Hive-compat mode.
+static boost::function<bool (char)> ShouldNotEscape = 
boost::is_any_of("-_.~"); // NOLINT(*)
+
+static inline void UrlEncode(const char* in, int in_len, string* out, bool 
hive_compat) {
+  (*out).reserve(in_len);
+  std::ostringstream ss;
+  for (int i = 0; i < in_len; ++i) {
+    const char ch = in[i];
+    // Escape the character iff a) we are in Hive-compat mode and the
+    // character is in the Hive whitelist or b) we are not in
+    // Hive-compat mode, and the character is not alphanumeric or one
+    // of the four commonly excluded characters.
+    if ((hive_compat && HiveShouldEscape(ch)) ||
+        (!hive_compat && !(isalnum(ch) || ShouldNotEscape(ch)))) {
+      ss << '%' << std::uppercase << std::hex << static_cast<uint32_t>(ch);
+    } else {
+      ss << ch;
+    }
+  }
+
+  (*out) = ss.str();
+}
+
+void UrlEncode(const vector<uint8_t>& in, string* out, bool hive_compat) {
+  if (in.empty()) {
+    *out = "";
+  } else {
+    UrlEncode(reinterpret_cast<const char*>(&in[0]), in.size(), out, 
hive_compat);
+  }
+}
+
+void UrlEncode(const string& in, string* out, bool hive_compat) {
+  UrlEncode(in.c_str(), in.size(), out, hive_compat);
+}
+
+string UrlEncodeToString(const std::string& in, bool hive_compat) {
+  string ret;
+  UrlEncode(in, &ret, hive_compat);
+  return ret;
+}
+
+// Adapted from
+// http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/
+//   example/http/server3/request_handler.cpp
+// See http://www.boost.org/LICENSE_1_0.txt for license for this method.
+bool UrlDecode(const string& in, string* out, bool hive_compat) {
+  out->clear();
+  out->reserve(in.size());
+  for (size_t i = 0; i < in.size(); ++i) {
+    if (in[i] == '%') {
+      if (i + 3 <= in.size()) {
+        int value = 0;
+        std::istringstream is(in.substr(i + 1, 2));
+        if (is >> std::hex >> value) {
+          (*out) += static_cast<char>(value);
+          i += 2;
+        } else {
+          return false;
+        }
+      } else {
+        return false;
+      }
+    } else if (!hive_compat && in[i] == '+') { // Hive does not encode ' ' as 
'+'
+      (*out) += ' ';
+    } else {
+      (*out) += in[i];
+    }
+  }
+  return true;
+}
+
+static inline void Base64Encode(const char* in, int in_len, 
std::ostringstream* out) {
+  typedef base64_from_binary<transform_width<const char*, 6, 8> > 
base64_encode;
+  // Base64 encodes 8 byte chars as 6 bit values.
+  std::ostringstream::pos_type len_before = out->tellp();
+  copy(base64_encode(in), base64_encode(in + in_len), 
std::ostream_iterator<char>(*out));
+  int bytes_written = out->tellp() - len_before;
+  // Pad with = to make it valid base64 encoded string
+  int num_pad = bytes_written % 4;
+  if (num_pad != 0) {
+    num_pad = 4 - num_pad;
+    for (int i = 0; i < num_pad; ++i) {
+      (*out) << "=";
+    }
+  }
+  DCHECK_EQ(out->str().size() % 4, 0);
+}
+
+void Base64Encode(const vector<uint8_t>& in, string* out) {
+  if (in.empty()) {
+    *out = "";
+  } else {
+    std::ostringstream ss;
+    Base64Encode(in, &ss);
+    *out = ss.str();
+  }
+}
+
+void Base64Encode(const vector<uint8_t>& in, std::ostringstream* out) {
+  if (!in.empty()) {
+    // Boost does not like non-null terminated strings
+    string tmp(reinterpret_cast<const char*>(&in[0]), in.size());
+    Base64Encode(tmp.c_str(), tmp.size(), out);
+  }
+}
+
+void Base64Encode(const string& in, string* out) {
+  std::ostringstream ss;
+  Base64Encode(in.c_str(), in.size(), &ss);
+  *out = ss.str();
+}
+
+void Base64Encode(const string& in, std::ostringstream* out) {
+  Base64Encode(in.c_str(), in.size(), out);
+}
+
+bool Base64Decode(const string& in, string* out) {
+  typedef transform_width<binary_from_base64<string::const_iterator>, 8, 6> 
base64_decode;
+  string tmp = in;
+  // Replace padding with base64 encoded NULL
+  replace(tmp.begin(), tmp.end(), '=', 'A');
+  try {
+    *out = string(base64_decode(tmp.begin()), base64_decode(tmp.end()));
+  } catch(std::exception& e) {
+    return false;
+  }
+
+  // Remove trailing '\0' that were added as padding.  Since \0 is special,
+  // the boost functions get confused so do this manually.
+  int num_padded_chars = 0;
+  for (int i = out->size() - 1; i >= 0; --i) {
+    if ((*out)[i] != '\0') break;
+    ++num_padded_chars;
+  }
+  out->resize(out->size() - num_padded_chars);
+  return true;
+}
+
+void EscapeForHtml(const string& in, std::ostringstream* out) {
+  DCHECK(out != nullptr);
+  for (const char& c : in) {
+    switch (c) {
+      case '<': (*out) << "&lt;";
+                break;
+      case '>': (*out) << "&gt;";
+                break;
+      case '&': (*out) << "&amp;";
+                break;
+      default: (*out) << c;
+    }
+  }
+}
+
+std::string EscapeForHtmlToString(const std::string& in) {
+  std::ostringstream str;
+  EscapeForHtml(in, &str);
+  return str.str();
+}
+
+} // namespace kudu


Reply via email to