http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/minidump.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/minidump.cc b/be/src/kudu/util/minidump.cc
new file mode 100644
index 0000000..c088ae7
--- /dev/null
+++ b/be/src/kudu/util/minidump.cc
@@ -0,0 +1,378 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/minidump.h"
+
+#include <signal.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <atomic>
+#include <memory>
+#include <string>
+
+#if defined(__linux__)
+#include <breakpad/client/linux/handler/exception_handler.h>
+#include <breakpad/common/linux/linux_libc_support.h>
+#endif // defined(__linux__)
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/linux_syscall_support.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/status.h"
+
+using kudu::env_util::CreateDirIfMissing;
+using std::string;
+
+#if defined(__linux__)
+static constexpr bool kMinidumpPlatformSupported = true;
+#else
+static constexpr bool kMinidumpPlatformSupported = false;
+#endif // defined(__linux__)
+
+DECLARE_string(log_dir);
+
+DEFINE_bool(enable_minidumps, kMinidumpPlatformSupported,
+            "Whether to enable minidump generation upon process crash or 
SIGUSR1. "
+            "Currently only supported on Linux systems.");
+TAG_FLAG(enable_minidumps, advanced);
+TAG_FLAG(enable_minidumps, evolving);
+static bool ValidateMinidumpEnabled(const char* /*flagname*/, bool value) {
+  if (value && !kMinidumpPlatformSupported) {
+    return false; // NOLINT(*)
+  }
+  return true;
+}
+DEFINE_validator(enable_minidumps, &ValidateMinidumpEnabled);
+
+DEFINE_string(minidump_path, "minidumps", "Directory to write minidump files 
to. This "
+    "can be either an absolute path or a path relative to --log_dir. Each 
daemon will "
+    "create an additional sub-directory to prevent naming conflicts and to 
make it "
+    "easier to identify a crashing daemon. Minidump files contain 
crash-related "
+    "information in a compressed format. Minidumps will be written when a 
daemon exits "
+    "unexpectedly, for example on an unhandled exception or signal, or when a "
+    "SIGUSR1 signal is sent to the process. Cannot be set to an empty value.");
+TAG_FLAG(minidump_path, evolving);
+// The minidump path cannot be empty.
+static bool ValidateMinidumpPath(const char* /*flagname*/, const string& 
value) {
+  return !value.empty();
+}
+DEFINE_validator(minidump_path, &ValidateMinidumpPath);
+
+DEFINE_int32(max_minidumps, 9, "Maximum number of minidump files to keep per 
daemon. "
+    "Older files are removed first. Set to 0 to keep all minidump files.");
+TAG_FLAG(max_minidumps, evolving);
+
+DEFINE_int32(minidump_size_limit_hint_kb, 20480, "Size limit hint for minidump 
files in "
+    "KB. If a minidump exceeds this value, then breakpad will reduce the stack 
memory it "
+    "collects for each thread from 8KB to 2KB. However it will always include 
the full "
+    "stack memory for the first 20 threads, including the thread that 
crashed.");
+TAG_FLAG(minidump_size_limit_hint_kb, advanced);
+TAG_FLAG(minidump_size_limit_hint_kb, evolving);
+
+#if !defined(__linux__)
+namespace google_breakpad {
+// Define this as an empty class to avoid an undefined symbol error on Mac.
+class ExceptionHandler {
+ public:
+  ExceptionHandler() {}
+  ~ExceptionHandler() {}
+};
+} // namespace google_breakpad
+#endif // !defined(__linux__)
+
+namespace kudu {
+
+static sigset_t GetSigset(int signo) {
+  sigset_t signals;
+  CHECK_EQ(0, sigemptyset(&signals));
+  CHECK_EQ(0, sigaddset(&signals, signo));
+  return signals;
+}
+
+#if defined(__linux__)
+
+// Called by the exception handler before minidump is produced.
+// Minidump is only written if this returns true.
+static bool FilterCallback(void* /*context*/) {
+  return true;
+}
+
+// Write two null-terminated strings and a newline to both stdout and stderr.
+static void WriteLineStdoutStderr(const char* msg1, const char* msg2) {
+  // We use Breakpad's reimplementation of strlen(), called my_strlen(), from
+  // linux_libc_support.h to avoid calling into libc.
+  // A comment from linux_libc_support.h is reproduced here:
+  // "This header provides replacements for libc functions that we need. If we
+  // call the libc functions directly we risk crashing in the dynamic linker as
+  // it tries to resolve uncached PLT entries."
+  int msg1_len = my_strlen(msg1);
+  int msg2_len = my_strlen(msg2);
+
+  // We use sys_write() from linux_syscall_support.h here per the
+  // recommendation of the breakpad docs for the same reasons as above.
+  for (int fd : {STDOUT_FILENO, STDERR_FILENO}) {
+    sys_write(fd, msg1, msg1_len);
+    sys_write(fd, msg2, msg2_len);
+    sys_write(fd, "\n", 1);
+  }
+}
+
+// Callback for breakpad. It is called whenever a minidump file has been
+// written and should not be called directly. It logs the event before breakpad
+// crashes the process. Due to the process being in a failed state we write to
+// stdout/stderr and let the surrounding redirection make sure the output gets
+// logged. The calls might still fail in unknown scenarios as the process is in
+// a broken state. However we don't rely on them as the minidump file has been
+// written already.
+static bool DumpCallback(const google_breakpad::MinidumpDescriptor& descriptor,
+                         void* context, bool succeeded) {
+
+  // Indicate whether a minidump file was written successfully. Write message
+  // to stdout/stderr, which will usually be captured in the INFO/ERROR log.
+  if (succeeded) {
+    WriteLineStdoutStderr("Wrote minidump to ", descriptor.path());
+  } else {
+    WriteLineStdoutStderr("Failed to write minidump to ", descriptor.path());
+  }
+
+  // If invoked by a user signal, return the actual success or failure of
+  // writing the minidump file so that we can print a user-friendly error
+  // message if writing the minidump fails.
+  bool is_user_signal = context != nullptr && 
*reinterpret_cast<bool*>(context);
+  if (is_user_signal) {
+    return succeeded;
+  }
+
+  // For crash signals. If we didn't want to invoke the previously-installed
+  // signal handler from glog, we would return the value received in
+  // 'succeeded' as described in the breakpad documentation. If this callback
+  // function returned true, breakpad would not invoke the previously-installed
+  // signal handler; instead, it would invoke the default signal handler, which
+  // would cause the process to crash immediately after writing the minidump.
+  //
+  // We make this callback always return false so that breakpad will invoke any
+  // previously-installed signal handler afterward. We want that to happen
+  // because the glog signal handlers print a helpful stacktrace on crash.
+  // That's convenient to have, because unlike a minidump, it doesn't need to
+  // be decoded to be useful for debugging.
+  return false;
+}
+
+// Failure function that simply calls abort().
+static void AbortFailureFunction() {
+  abort();
+}
+
+bool MinidumpExceptionHandler::WriteMinidump() {
+  bool user_signal = true;
+  return google_breakpad::ExceptionHandler::WriteMinidump(minidump_dir(),
+                                                          &DumpCallback,
+                                                          &user_signal);
+}
+
+Status MinidumpExceptionHandler::InitMinidumpExceptionHandler() {
+  minidump_dir_ = FLAGS_minidump_path;
+  if (minidump_dir_[0] != '/') {
+    minidump_dir_ = JoinPathSegments(FLAGS_log_dir, minidump_dir_);
+  }
+
+  // Create the first-level minidump directory.
+  Env* env = Env::Default();
+  RETURN_NOT_OK_PREPEND(CreateDirIfMissing(env, minidump_dir_),
+                        "Error creating top-level minidump directory");
+
+  // Add the executable name to the path where minidumps will be written. This 
makes
+  // identification easier and prevents name collisions between the files.
+  // This is also consistent with how Impala organizes its minidump files.
+  const char* exe_name = gflags::ProgramInvocationShortName();
+  minidump_dir_ = JoinPathSegments(minidump_dir_, exe_name);
+
+  // Create the directory if it is not there. The minidump doesn't get written 
if there is
+  // no directory.
+  RETURN_NOT_OK_PREPEND(CreateDirIfMissing(env, minidump_dir_),
+                        "Error creating minidump directory");
+
+  // Verify that the minidump directory really is a directory. We canonicalize
+  // in case it's a symlink to a directory.
+  string canonical_minidump_path;
+  RETURN_NOT_OK(env->Canonicalize(minidump_dir_, &canonical_minidump_path));
+  bool is_dir;
+  RETURN_NOT_OK(env->IsDirectory(canonical_minidump_path, &is_dir));
+  if (!is_dir) {
+    return Status::IOError("Unable to create minidump directory", 
canonical_minidump_path);
+  }
+
+  google_breakpad::MinidumpDescriptor desc(minidump_dir_);
+
+  // Limit filesize if configured.
+  if (FLAGS_minidump_size_limit_hint_kb > 0) {
+    size_t size_limit = 1024 * 
static_cast<int64_t>(FLAGS_minidump_size_limit_hint_kb);
+    LOG(INFO) << "Setting minidump size limit to "
+              << HumanReadableNumBytes::ToStringWithoutRounding(size_limit);
+    desc.set_size_limit(size_limit);
+  }
+
+  // If we don't uninstall the glog failure function when minidumps are enabled
+  // then we get two (2) stack traces printed from a LOG(FATAL) or CHECK(): one
+  // from the glog failure function and one from the glog signal handler. That
+  // is because we always return false in DumpCallback() in the non-user signal
+  // case.
+  google::InstallFailureFunction(&AbortFailureFunction);
+
+  breakpad_handler_.reset(
+      new google_breakpad::ExceptionHandler(desc,           // Path to 
minidump directory.
+                                            FilterCallback, // Indicates 
whether to write the dump.
+                                            DumpCallback,   // Output a log 
message when dumping.
+                                            nullptr,        // Optional 
context for callbacks.
+                                            true,           // Whether to 
install a crash handler.
+                                            -1));           // -1: Use 
in-process dump generation.
+
+  return Status::OK();
+}
+
+Status MinidumpExceptionHandler::RegisterMinidumpExceptionHandler() {
+  if (!FLAGS_enable_minidumps) return Status::OK();
+
+  // Ensure only one active instance is alive per process at any given time.
+  CHECK_EQ(0, MinidumpExceptionHandler::current_num_instances_.fetch_add(1));
+  RETURN_NOT_OK(InitMinidumpExceptionHandler());
+  RETURN_NOT_OK(StartUserSignalHandlerThread());
+  return Status::OK();
+}
+
+void MinidumpExceptionHandler::UnregisterMinidumpExceptionHandler() {
+  if (!FLAGS_enable_minidumps) return;
+
+  StopUserSignalHandlerThread();
+  CHECK_EQ(1, MinidumpExceptionHandler::current_num_instances_.fetch_sub(1));
+}
+
+Status MinidumpExceptionHandler::StartUserSignalHandlerThread() {
+  user_signal_handler_thread_running_.store(true, std::memory_order_relaxed);
+  return Thread::Create("minidump", "sigusr1-handler",
+                        &MinidumpExceptionHandler::RunUserSignalHandlerThread,
+                        this, &user_signal_handler_thread_);
+}
+
+void MinidumpExceptionHandler::StopUserSignalHandlerThread() {
+  user_signal_handler_thread_running_.store(false, std::memory_order_relaxed);
+  std::atomic_thread_fence(std::memory_order_release); // Store before signal.
+  // Send SIGUSR1 signal to thread, which will wake it up.
+  kill(getpid(), SIGUSR1);
+  user_signal_handler_thread_->Join();
+}
+
+void MinidumpExceptionHandler::RunUserSignalHandlerThread() {
+  sigset_t signals = GetSigset(SIGUSR1);
+  while (true) {
+    int signal;
+    int err = sigwait(&signals, &signal);
+    CHECK(err == 0) << "sigwait(): " << ErrnoToString(err) << ": " << err;
+    CHECK_EQ(SIGUSR1, signal);
+    if (!user_signal_handler_thread_running_.load(std::memory_order_relaxed)) {
+      // Exit thread if we are shutting down.
+      return;
+    }
+    if (!WriteMinidump()) {
+      LOG(WARNING) << "Received USR1 signal but failed to write minidump";
+    }
+  }
+}
+
+#else // defined(__linux__)
+
+// At the time of writing, we don't support breakpad on Mac so we just stub out
+// all the methods defined in the header file.
+
+Status MinidumpExceptionHandler::InitMinidumpExceptionHandler() {
+  return Status::OK();
+}
+
+// No-op on non-Linux platforms.
+Status MinidumpExceptionHandler::RegisterMinidumpExceptionHandler() {
+  return Status::OK();
+}
+
+void MinidumpExceptionHandler::UnregisterMinidumpExceptionHandler() {
+}
+
+bool MinidumpExceptionHandler::WriteMinidump() {
+  return true;
+}
+
+Status MinidumpExceptionHandler::StartUserSignalHandlerThread() {
+  return Status::OK();
+}
+
+void MinidumpExceptionHandler::StopUserSignalHandlerThread() {
+}
+
+void MinidumpExceptionHandler::RunUserSignalHandlerThread() {
+}
+
+#endif // defined(__linux__)
+
+std::atomic<int> MinidumpExceptionHandler::current_num_instances_;
+
+MinidumpExceptionHandler::MinidumpExceptionHandler() {
+  CHECK_OK(RegisterMinidumpExceptionHandler());
+}
+
+MinidumpExceptionHandler::~MinidumpExceptionHandler() {
+  UnregisterMinidumpExceptionHandler();
+}
+
+Status MinidumpExceptionHandler::DeleteExcessMinidumpFiles(Env* env) {
+  // Do not delete minidump files if minidumps are disabled.
+  if (!FLAGS_enable_minidumps) return Status::OK();
+
+  int32_t max_minidumps = FLAGS_max_minidumps;
+  // Disable rotation if set to 0 or less.
+  if (max_minidumps <= 0) return Status::OK();
+
+  // Minidump filenames are created by breakpad in the following format, for 
example:
+  // 7b57915b-ee6a-dbc5-21e59491-5c60a2cf.dmp.
+  string pattern = JoinPathSegments(minidump_dir(), "*.dmp");
+
+  // Use mtime to determine which minidumps to delete. While this could
+  // potentially be ambiguous if many minidumps were created in quick
+  // succession, users can always increase 'FLAGS_max_minidumps' if desired
+  // in order to work around the problem.
+  return env_util::DeleteExcessFilesByPattern(env, pattern, max_minidumps);
+}
+
+string MinidumpExceptionHandler::minidump_dir() const {
+  return minidump_dir_;
+}
+
+Status BlockSigUSR1() {
+  sigset_t signals = GetSigset(SIGUSR1);
+  int ret = pthread_sigmask(SIG_BLOCK, &signals, nullptr);
+  if (ret == 0) return Status::OK();
+  return Status::InvalidArgument("pthread_sigmask", ErrnoToString(ret), ret);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/minidump.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/minidump.h b/be/src/kudu/util/minidump.h
new file mode 100644
index 0000000..459639b
--- /dev/null
+++ b/be/src/kudu/util/minidump.h
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/thread.h"
+
+namespace google_breakpad {
+class ExceptionHandler;
+} // namespace google_breakpad
+
+namespace kudu {
+
+class Env;
+class Status;
+
+// While an instance of this class is in scope, a Breakpad minidump handler
+// will generate a minidump for the current program if it crashes or if it
+// received a USR1 signal. This class must be instantiated after initializing
+// the gflags library. When used in conjuction with glog, or other signal
+// handling facilities, this class must be invoked after installing those
+// signal handlers.
+//
+// The BlockSigUSR1() function should be called before spawning any threads in
+// order to block the USR1 signal from crashing the process. This class relies
+// on that signal being blocked by all threads in order to safely generate
+// minidumps in response to the USR1 signal.
+//
+// Only one instance of this class may be instantiated at a time.
+//
+// For more information on Google Breakpad, see its documentation at:
+// 
http://chromium.googlesource.com/breakpad/breakpad/+/master/docs/getting_started_with_breakpad.md
+class MinidumpExceptionHandler {
+ public:
+  MinidumpExceptionHandler();
+  ~MinidumpExceptionHandler();
+
+  // Write a minidump immediately. Can be used to generate a minidump
+  // independently of a crash. Should not be called from a signal handler or a
+  // crash context because it uses the heap.
+  bool WriteMinidump();
+
+  // Deletes excess minidump files beyond the configured max of
+  // 'FLAGS_max_minidumps'. Uses the file's modified time to determine recency.
+  // Does nothing if 'FLAGS_enabled_minidumps' is false.
+  Status DeleteExcessMinidumpFiles(Env* env);
+
+  // Get the path to the directory that will be used for writing minidumps.
+  std::string minidump_dir() const;
+
+ private:
+  Status InitMinidumpExceptionHandler();
+  Status RegisterMinidumpExceptionHandler();
+  void UnregisterMinidumpExceptionHandler();
+
+  Status StartUserSignalHandlerThread();
+  void StopUserSignalHandlerThread();
+  void RunUserSignalHandlerThread();
+
+  // The number of instnaces of this class that are currently in existence.
+  // We keep this counter in order to force a crash if more than one is running
+  // at a time, as a sanity check.
+  static std::atomic<int> current_num_instances_;
+
+  #ifndef __APPLE__
+  std::atomic<bool> user_signal_handler_thread_running_;// Unused in macOS 
build.
+  #endif
+
+  scoped_refptr<Thread> user_signal_handler_thread_;
+
+  // Breakpad ExceptionHandler. It registers its own signal handlers to write
+  // minidump files during process crashes, but can also be used to write
+  // minidumps directly.
+  std::unique_ptr<google_breakpad::ExceptionHandler> breakpad_handler_;
+
+  // Directory in which we store our minidumps.
+  std::string minidump_dir_;
+};
+
+// Block SIGUSR1 from threads handling it.
+// This should be called by the process before spawning any threads so that a
+// USR1 signal will cause a minidump to be generated instead of a crash.
+Status BlockSigUSR1();
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/monotime-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/monotime-test.cc 
b/be/src/kudu/util/monotime-test.cc
new file mode 100644
index 0000000..f193ecd
--- /dev/null
+++ b/be/src/kudu/util/monotime-test.cc
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/monotime.h"
+
+#include <sys/time.h>
+#include <unistd.h>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+TEST(TestMonoTime, TestMonotonicity) {
+  alarm(360);
+  MonoTime prev(MonoTime::Now());
+  MonoTime next;
+
+  do {
+    next = MonoTime::Now();
+    //LOG(INFO) << " next = " << next.ToString();
+  } while (!prev.ComesBefore(next));
+  ASSERT_FALSE(next.ComesBefore(prev));
+  alarm(0);
+}
+
+TEST(TestMonoTime, TestComparison) {
+  MonoTime now(MonoTime::Now());
+  MonoTime future(now);
+  future.AddDelta(MonoDelta::FromNanoseconds(1L));
+
+  ASSERT_GT((future - now).ToNanoseconds(), 0);
+  ASSERT_LT((now - future).ToNanoseconds(), 0);
+  ASSERT_EQ((now - now).ToNanoseconds(), 0);
+
+  MonoDelta nano(MonoDelta::FromNanoseconds(1L));
+  MonoDelta mil(MonoDelta::FromMilliseconds(1L));
+  MonoDelta sec(MonoDelta::FromSeconds(1.0));
+
+  ASSERT_TRUE(nano.LessThan(mil));
+  ASSERT_TRUE(mil.LessThan(sec));
+  ASSERT_TRUE(mil.MoreThan(nano));
+  ASSERT_TRUE(sec.MoreThan(mil));
+}
+
+TEST(TestMonoTime, TestTimeVal) {
+  struct timeval tv;
+  tv.tv_sec = 0;
+  tv.tv_usec = 0;
+
+  // Normal conversion case.
+  MonoDelta one_sec_one_micro(MonoDelta::FromNanoseconds(1000001000L));
+  one_sec_one_micro.ToTimeVal(&tv);
+  ASSERT_EQ(1, tv.tv_sec);
+  ASSERT_EQ(1, tv.tv_usec);
+
+  // Case where we are still positive but sub-micro.
+  // Round up to nearest microsecond. This is to avoid infinite timeouts
+  // in APIs that take a struct timeval.
+  MonoDelta zero_sec_one_nano(MonoDelta::FromNanoseconds(1L));
+  zero_sec_one_nano.ToTimeVal(&tv);
+  ASSERT_EQ(0, tv.tv_sec);
+  ASSERT_EQ(1, tv.tv_usec); // Special case: 1ns rounds up to
+
+  // Negative conversion case. Ensure the timeval is normalized.
+  // That means sec is negative and usec is positive.
+  MonoDelta neg_micro(MonoDelta::FromMicroseconds(-1L));
+  ASSERT_EQ(-1000, neg_micro.ToNanoseconds());
+  neg_micro.ToTimeVal(&tv);
+  ASSERT_EQ(-1, tv.tv_sec);
+  ASSERT_EQ(999999, tv.tv_usec);
+
+  // Case where we are still negative but sub-micro.
+  // Round up to nearest microsecond. This is to avoid infinite timeouts
+  // in APIs that take a struct timeval and for consistency.
+  MonoDelta zero_sec_neg_one_nano(MonoDelta::FromNanoseconds(-1L));
+  zero_sec_neg_one_nano.ToTimeVal(&tv);
+  ASSERT_EQ(-1, tv.tv_sec);
+  ASSERT_EQ(999999, tv.tv_usec);
+}
+
+TEST(TestMonoTime, TestTimeSpec) {
+  MonoTime one_sec_one_nano_expected(1000000001L);
+  struct timespec ts;
+  ts.tv_sec = 1;
+  ts.tv_nsec = 1;
+  MonoTime one_sec_one_nano_actual(ts);
+  ASSERT_EQ(0, 
one_sec_one_nano_expected.GetDeltaSince(one_sec_one_nano_actual).ToNanoseconds());
+
+  MonoDelta zero_sec_two_nanos(MonoDelta::FromNanoseconds(2L));
+  zero_sec_two_nanos.ToTimeSpec(&ts);
+  ASSERT_EQ(0, ts.tv_sec);
+  ASSERT_EQ(2, ts.tv_nsec);
+
+  // Negative conversion case. Ensure the timespec is normalized.
+  // That means sec is negative and nsec is positive.
+  MonoDelta neg_nano(MonoDelta::FromNanoseconds(-1L));
+  ASSERT_EQ(-1, neg_nano.ToNanoseconds());
+  neg_nano.ToTimeSpec(&ts);
+  ASSERT_EQ(-1, ts.tv_sec);
+  ASSERT_EQ(999999999, ts.tv_nsec);
+
+}
+
+TEST(TestMonoTime, TestDeltas) {
+  alarm(360);
+  const MonoDelta max_delta(MonoDelta::FromSeconds(0.1));
+  MonoTime prev(MonoTime::Now());
+  MonoTime next;
+  MonoDelta cur_delta;
+  do {
+    next = MonoTime::Now();
+    cur_delta = next.GetDeltaSince(prev);
+  } while (cur_delta.LessThan(max_delta));
+  alarm(0);
+}
+
+TEST(TestMonoTime, TestDeltaConversions) {
+  // TODO: Reliably test MonoDelta::FromSeconds() considering floating-point 
rounding errors
+
+  MonoDelta mil(MonoDelta::FromMilliseconds(500));
+  ASSERT_EQ(500 * MonoTime::kNanosecondsPerMillisecond, mil.nano_delta_);
+
+  MonoDelta micro(MonoDelta::FromMicroseconds(500));
+  ASSERT_EQ(500 * MonoTime::kNanosecondsPerMicrosecond, micro.nano_delta_);
+
+  MonoDelta nano(MonoDelta::FromNanoseconds(500));
+  ASSERT_EQ(500, nano.nano_delta_);
+}
+
+static void DoTestMonoTimePerf() {
+  const MonoDelta max_delta(MonoDelta::FromMilliseconds(500));
+  uint64_t num_calls = 0;
+  MonoTime prev(MonoTime::Now());
+  MonoTime next;
+  MonoDelta cur_delta;
+  do {
+    next = MonoTime::Now();
+    cur_delta = next.GetDeltaSince(prev);
+    num_calls++;
+  } while (cur_delta.LessThan(max_delta));
+  LOG(INFO) << "DoTestMonoTimePerf():"
+        << num_calls << " in "
+        << max_delta.ToString() << " seconds.";
+}
+
+TEST(TestMonoTime, TestSleepFor) {
+  MonoTime start = MonoTime::Now();
+  MonoDelta sleep = MonoDelta::FromMilliseconds(100);
+  SleepFor(sleep);
+  MonoTime end = MonoTime::Now();
+  MonoDelta actualSleep = end.GetDeltaSince(start);
+  ASSERT_GE(actualSleep.ToNanoseconds(), sleep.ToNanoseconds());
+}
+
+TEST(TestMonoTime, TestSleepForOverflow) {
+  if (!AllowSlowTests()) {
+    LOG(INFO) << "Skipping test because it sleeps for ~4s";
+    return;
+  }
+
+  // This quantity (~4s sleep) overflows a 32-bit integer such that
+  // the value becomes 0.
+  MonoTime start = MonoTime::Now();
+  MonoDelta sleep = MonoDelta::FromNanoseconds(1L << 32);
+  SleepFor(sleep);
+  MonoTime end = MonoTime::Now();
+  MonoDelta actualSleep = end.GetDeltaSince(start);
+  ASSERT_GE(actualSleep.ToNanoseconds(), sleep.ToNanoseconds());
+}
+
+// Test functionality of the handy operators for MonoTime/MonoDelta objects.
+// The test assumes that the core functionality provided by the
+// MonoTime/MonoDelta objects are in place, and it tests that the operators
+// have the expected behavior expressed in terms of already existing,
+// semantically equivalent methods.
+TEST(TestMonoTime, TestOperators) {
+  // MonoTime& MonoTime::operator+=(const MonoDelta& delta);
+  {
+    MonoTime tmp = MonoTime::Now();
+    MonoTime start = tmp;
+    MonoDelta delta = MonoDelta::FromMilliseconds(100);
+    MonoTime o_end = start;
+    o_end += delta;
+    tmp.AddDelta(delta);
+    MonoTime m_end = tmp;
+    EXPECT_TRUE(m_end.Equals(o_end));
+  }
+
+  // MonoTime& MonoTime::operator-=(const MonoDelta& delta);
+  {
+    MonoTime tmp = MonoTime::Now();
+    MonoTime start = tmp;
+    MonoDelta delta = MonoDelta::FromMilliseconds(100);
+    MonoTime o_end = start;
+    o_end -= delta;
+    tmp.AddDelta(MonoDelta::FromNanoseconds(-delta.ToNanoseconds()));
+    MonoTime m_end = tmp;
+    EXPECT_TRUE(m_end.Equals(o_end));
+  }
+
+  // bool operator==(const MonoDelta& lhs, const MonoDelta& rhs);
+  {
+    MonoDelta dn = MonoDelta::FromNanoseconds(0);
+    MonoDelta dm = MonoDelta::FromMicroseconds(0);
+    ASSERT_TRUE(dn.Equals(dm));
+    EXPECT_TRUE(dn == dm);
+    EXPECT_TRUE(dm == dn);
+  }
+
+  // bool operator!=(const MonoDelta& lhs, const MonoDelta& rhs);
+  {
+    MonoDelta dn = MonoDelta::FromNanoseconds(1);
+    MonoDelta dm = MonoDelta::FromMicroseconds(1);
+    ASSERT_FALSE(dn.Equals(dm));
+    EXPECT_TRUE(dn != dm);
+    EXPECT_TRUE(dm != dn);
+  }
+
+  // bool operator<(const MonoDelta& lhs, const MonoDelta& rhs);
+  {
+    MonoDelta d0 = MonoDelta::FromNanoseconds(0);
+    MonoDelta d1 = MonoDelta::FromNanoseconds(1);
+    ASSERT_TRUE(d0.LessThan(d1));
+    EXPECT_TRUE(d0 < d1);
+  }
+
+  // bool operator<=(const MonoDelta& lhs, const MonoDelta& rhs);
+  {
+    MonoDelta d0 = MonoDelta::FromNanoseconds(0);
+    MonoDelta d1 = MonoDelta::FromNanoseconds(1);
+    ASSERT_TRUE(d0.LessThan(d1));
+    EXPECT_TRUE(d0 <= d1);
+
+    MonoDelta d20 = MonoDelta::FromNanoseconds(2);
+    MonoDelta d21 = MonoDelta::FromNanoseconds(2);
+    ASSERT_TRUE(d20.Equals(d21));
+    EXPECT_TRUE(d20 <= d21);
+  }
+
+  // bool operator>(const MonoDelta& lhs, const MonoDelta& rhs);
+  {
+    MonoDelta d0 = MonoDelta::FromNanoseconds(0);
+    MonoDelta d1 = MonoDelta::FromNanoseconds(1);
+    ASSERT_TRUE(d1.MoreThan(d0));
+    EXPECT_TRUE(d1 > d0);
+  }
+
+  // bool operator>=(const MonoDelta& lhs, const MonoDelta& rhs);
+  {
+    MonoDelta d0 = MonoDelta::FromNanoseconds(0);
+    MonoDelta d1 = MonoDelta::FromNanoseconds(1);
+    ASSERT_TRUE(d1.MoreThan(d0));
+    EXPECT_TRUE(d1 >= d1);
+
+    MonoDelta d20 = MonoDelta::FromNanoseconds(2);
+    MonoDelta d21 = MonoDelta::FromNanoseconds(2);
+    ASSERT_TRUE(d20.Equals(d21));
+    EXPECT_TRUE(d21 >= d20);
+  }
+
+  // bool operator==(const MonoTime& lhs, const MonoTime& rhs);
+  {
+    MonoTime t0 = MonoTime::Now();
+    MonoTime t1(t0);
+    ASSERT_TRUE(t0.Equals(t1));
+    ASSERT_TRUE(t1.Equals(t0));
+    EXPECT_TRUE(t0 == t1);
+    EXPECT_TRUE(t1 == t0);
+  }
+
+  // bool operator!=(const MonoTime& lhs, const MonoTime& rhs);
+  {
+    MonoTime t0 = MonoTime::Now();
+    MonoTime t1(t0 + MonoDelta::FromMilliseconds(100));
+    ASSERT_TRUE(!t0.Equals(t1));
+    ASSERT_TRUE(!t1.Equals(t0));
+    EXPECT_TRUE(t0 != t1);
+    EXPECT_TRUE(t1 != t0);
+  }
+
+  // bool operator<(const MonoTime& lhs, const MonoTime& rhs);
+  {
+    MonoTime t0 = MonoTime::Now();
+    MonoTime t1(t0 + MonoDelta::FromMilliseconds(100));
+    ASSERT_TRUE(t0.ComesBefore(t1));
+    ASSERT_FALSE(t1.ComesBefore(t0));
+    EXPECT_TRUE(t0 < t1);
+    EXPECT_FALSE(t1 < t0);
+  }
+
+  // bool operator<=(const MonoTime& lhs, const MonoTime& rhs);
+  {
+    MonoTime t00 = MonoTime::Now();
+    MonoTime t01(t00);
+    ASSERT_TRUE(t00.Equals(t00));
+    ASSERT_TRUE(t00.Equals(t01));
+    ASSERT_TRUE(t01.Equals(t00));
+    ASSERT_TRUE(t01.Equals(t01));
+    EXPECT_TRUE(t00 <= t00);
+    EXPECT_TRUE(t00 <= t01);
+    EXPECT_TRUE(t01 <= t00);
+    EXPECT_TRUE(t01 <= t01);
+
+    MonoTime t1(t00 + MonoDelta::FromMilliseconds(100));
+    ASSERT_TRUE(t00.ComesBefore(t1));
+    ASSERT_TRUE(t01.ComesBefore(t1));
+    ASSERT_FALSE(t1.ComesBefore(t00));
+    ASSERT_FALSE(t1.ComesBefore(t01));
+    EXPECT_TRUE(t00 <= t1);
+    EXPECT_TRUE(t01 <= t1);
+    EXPECT_FALSE(t1 <= t00);
+    EXPECT_FALSE(t1 <= t01);
+  }
+
+  // bool operator>(const MonoTime& lhs, const MonoTime& rhs);
+  {
+    MonoTime t0 = MonoTime::Now();
+    MonoTime t1(t0 + MonoDelta::FromMilliseconds(100));
+    ASSERT_TRUE(t0.ComesBefore(t1));
+    ASSERT_FALSE(t1.ComesBefore(t0));
+    EXPECT_TRUE(t0 < t1);
+    EXPECT_FALSE(t1 < t0);
+  }
+
+  // bool operator>=(const MonoTime& lhs, const MonoTime& rhs);
+  {
+    MonoTime t00 = MonoTime::Now();
+    MonoTime t01(t00);
+    ASSERT_TRUE(t00.Equals(t00));
+    ASSERT_TRUE(t00.Equals(t01));
+    ASSERT_TRUE(t01.Equals(t00));
+    ASSERT_TRUE(t01.Equals(t01));
+    EXPECT_TRUE(t00 >= t00);
+    EXPECT_TRUE(t00 >= t01);
+    EXPECT_TRUE(t01 >= t00);
+    EXPECT_TRUE(t01 >= t01);
+
+    MonoTime t1(t00 + MonoDelta::FromMilliseconds(100));
+    ASSERT_TRUE(t00.ComesBefore(t1));
+    ASSERT_TRUE(t01.ComesBefore(t1));
+    ASSERT_FALSE(t1.ComesBefore(t00));
+    ASSERT_FALSE(t1.ComesBefore(t01));
+    EXPECT_FALSE(t00 >= t1);
+    EXPECT_FALSE(t01 >= t1);
+    EXPECT_TRUE(t1 >= t00);
+    EXPECT_TRUE(t1 >= t01);
+  }
+
+  // MonoDelta operator-(const MonoTime& t0, const MonoTime& t1);
+  {
+    const int64_t deltas[] = { 100, -100 };
+
+    MonoTime tmp = MonoTime::Now();
+    for (auto d : deltas) {
+      MonoDelta delta = MonoDelta::FromMilliseconds(d);
+
+      MonoTime start = tmp;
+      tmp.AddDelta(delta);
+      MonoTime end = tmp;
+      MonoDelta delta_o = end - start;
+      EXPECT_TRUE(delta.Equals(delta_o));
+    }
+  }
+
+  // MonoTime operator+(const MonoTime& t, const MonoDelta& delta);
+  {
+    MonoTime start = MonoTime::Now();
+
+    MonoDelta delta_0 = MonoDelta::FromMilliseconds(0);
+    MonoTime end_0 = start + delta_0;
+    EXPECT_TRUE(end_0.Equals(start));
+
+    MonoDelta delta_1 = MonoDelta::FromMilliseconds(1);
+    MonoTime end_1 = start + delta_1;
+    EXPECT_TRUE(end_1 > end_0);
+    end_0.AddDelta(delta_1);
+    EXPECT_TRUE(end_0.Equals(end_1));
+  }
+
+  // MonoTime operator-(const MonoTime& t, const MonoDelta& delta);
+  {
+    MonoTime start = MonoTime::Now();
+
+    MonoDelta delta_0 = MonoDelta::FromMilliseconds(0);
+    MonoTime end_0 = start - delta_0;
+    EXPECT_TRUE(end_0.Equals(start));
+
+    MonoDelta delta_1 = MonoDelta::FromMilliseconds(1);
+    MonoTime end_1 = start - delta_1;
+    EXPECT_TRUE(end_1 < end_0);
+    end_1.AddDelta(delta_1);
+    EXPECT_TRUE(end_1.Equals(end_0));
+  }
+}
+
+TEST(TestMonoTimePerf, TestMonoTimePerf) {
+  alarm(360);
+  DoTestMonoTimePerf();
+  alarm(0);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/monotime.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/monotime.cc b/be/src/kudu/util/monotime.cc
new file mode 100644
index 0000000..b67be42
--- /dev/null
+++ b/be/src/kudu/util/monotime.cc
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/monotime.h"
+
+#include <glog/logging.h>
+#include <limits>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/thread_restrictions.h"
+
+namespace kudu {
+
+#define MAX_MONOTONIC_SECONDS \
+  (((1ULL<<63) - 1ULL) /(int64_t)MonoTime::kNanosecondsPerSecond)
+
+
+///
+/// MonoDelta
+///
+
+const int64_t MonoDelta::kUninitialized = kint64min;
+
+MonoDelta MonoDelta::FromSeconds(double seconds) {
+  int64_t delta = seconds * MonoTime::kNanosecondsPerSecond;
+  return MonoDelta(delta);
+}
+
+MonoDelta MonoDelta::FromMilliseconds(int64_t ms) {
+  return MonoDelta(ms * MonoTime::kNanosecondsPerMillisecond);
+}
+
+MonoDelta MonoDelta::FromMicroseconds(int64_t us) {
+  return MonoDelta(us * MonoTime::kNanosecondsPerMicrosecond);
+}
+
+MonoDelta MonoDelta::FromNanoseconds(int64_t ns) {
+  return MonoDelta(ns);
+}
+
+MonoDelta::MonoDelta()
+  : nano_delta_(kUninitialized) {
+}
+
+bool MonoDelta::Initialized() const {
+  return nano_delta_ != kUninitialized;
+}
+
+bool MonoDelta::LessThan(const MonoDelta &rhs) const {
+  DCHECK(Initialized());
+  DCHECK(rhs.Initialized());
+  return nano_delta_ < rhs.nano_delta_;
+}
+
+bool MonoDelta::MoreThan(const MonoDelta &rhs) const {
+  DCHECK(Initialized());
+  DCHECK(rhs.Initialized());
+  return nano_delta_ > rhs.nano_delta_;
+}
+
+bool MonoDelta::Equals(const MonoDelta &rhs) const {
+  DCHECK(Initialized());
+  DCHECK(rhs.Initialized());
+  return nano_delta_ == rhs.nano_delta_;
+}
+
+std::string MonoDelta::ToString() const {
+  return StringPrintf("%.3fs", ToSeconds());
+}
+
+MonoDelta::MonoDelta(int64_t delta)
+  : nano_delta_(delta) {
+}
+
+double MonoDelta::ToSeconds() const {
+  DCHECK(Initialized());
+  double d(nano_delta_);
+  d /= MonoTime::kNanosecondsPerSecond;
+  return d;
+}
+
+int64_t MonoDelta::ToNanoseconds() const {
+  DCHECK(Initialized());
+  return nano_delta_;
+}
+
+int64_t MonoDelta::ToMicroseconds() const {
+  DCHECK(Initialized());
+ return nano_delta_ / MonoTime::kNanosecondsPerMicrosecond;
+}
+
+int64_t MonoDelta::ToMilliseconds() const {
+  DCHECK(Initialized());
+  return nano_delta_ / MonoTime::kNanosecondsPerMillisecond;
+}
+
+void MonoDelta::ToTimeVal(struct timeval *tv) const {
+  DCHECK(Initialized());
+  tv->tv_sec = nano_delta_ / MonoTime::kNanosecondsPerSecond;
+  tv->tv_usec = (nano_delta_ - (tv->tv_sec * MonoTime::kNanosecondsPerSecond))
+      / MonoTime::kNanosecondsPerMicrosecond;
+
+  // tv_usec must be between 0 and 999999.
+  // There is little use for negative timevals so wrap it in PREDICT_FALSE.
+  if (PREDICT_FALSE(tv->tv_usec < 0)) {
+    --(tv->tv_sec);
+    tv->tv_usec += 1000000;
+  }
+
+  // Catch positive corner case where we "round down" and could potentially 
set a timeout of 0.
+  // Make it 1 usec.
+  if (PREDICT_FALSE(tv->tv_usec == 0 && tv->tv_sec == 0 && nano_delta_ > 0)) {
+    tv->tv_usec = 1;
+  }
+
+  // Catch negative corner case where we "round down" and could potentially 
set a timeout of 0.
+  // Make it -1 usec (but normalized, so tv_usec is not negative).
+  if (PREDICT_FALSE(tv->tv_usec == 0 && tv->tv_sec == 0 && nano_delta_ < 0)) {
+    tv->tv_sec = -1;
+    tv->tv_usec = 999999;
+  }
+}
+
+
+void MonoDelta::NanosToTimeSpec(int64_t nanos, struct timespec* ts) {
+  ts->tv_sec = nanos / MonoTime::kNanosecondsPerSecond;
+  ts->tv_nsec = nanos - (ts->tv_sec * MonoTime::kNanosecondsPerSecond);
+
+  // tv_nsec must be between 0 and 999999999.
+  // There is little use for negative timespecs so wrap it in PREDICT_FALSE.
+  if (PREDICT_FALSE(ts->tv_nsec < 0)) {
+    --(ts->tv_sec);
+    ts->tv_nsec += MonoTime::kNanosecondsPerSecond;
+  }
+}
+
+void MonoDelta::ToTimeSpec(struct timespec *ts) const {
+  DCHECK(Initialized());
+  NanosToTimeSpec(nano_delta_, ts);
+}
+
+///
+/// MonoTime
+///
+
+MonoTime MonoTime::Now() {
+#if defined(__APPLE__)
+  return MonoTime(walltime_internal::GetMonoTimeNanos());
+# else
+  struct timespec ts;
+  PCHECK(clock_gettime(CLOCK_MONOTONIC, &ts) == 0);
+  return MonoTime(ts);
+#endif // defined(__APPLE__)
+}
+
+MonoTime MonoTime::Max() {
+  return MonoTime(std::numeric_limits<int64_t>::max());
+}
+
+MonoTime MonoTime::Min() {
+  return MonoTime(1);
+}
+
+const MonoTime& MonoTime::Earliest(const MonoTime& a, const MonoTime& b) {
+  if (b.nanos_ < a.nanos_) {
+    return b;
+  }
+  return a;
+}
+
+MonoTime::MonoTime()
+  : nanos_(0) {
+}
+
+bool MonoTime::Initialized() const {
+  return nanos_ != 0;
+}
+
+MonoDelta MonoTime::GetDeltaSince(const MonoTime &rhs) const {
+  DCHECK(Initialized());
+  DCHECK(rhs.Initialized());
+  int64_t delta(nanos_);
+  delta -= rhs.nanos_;
+  return MonoDelta(delta);
+}
+
+void MonoTime::AddDelta(const MonoDelta &delta) {
+  DCHECK(Initialized());
+  nanos_ += delta.nano_delta_;
+}
+
+bool MonoTime::ComesBefore(const MonoTime &rhs) const {
+  DCHECK(Initialized());
+  DCHECK(rhs.Initialized());
+  return nanos_ < rhs.nanos_;
+}
+
+std::string MonoTime::ToString() const {
+  return StringPrintf("%.3fs", ToSeconds());
+}
+
+bool MonoTime::Equals(const MonoTime& other) const {
+  return nanos_ == other.nanos_;
+}
+
+MonoTime& MonoTime::operator+=(const MonoDelta& delta) {
+  this->AddDelta(delta);
+  return *this;
+}
+
+MonoTime& MonoTime::operator-=(const MonoDelta& delta) {
+  this->AddDelta(MonoDelta(-1 * delta.nano_delta_));
+  return *this;
+}
+
+MonoTime::MonoTime(const struct timespec &ts) {
+  // Monotonic time resets when the machine reboots.  The 64-bit limitation
+  // means that we can't represent times larger than 292 years, which should be
+  // adequate.
+  CHECK_LT(ts.tv_sec, MAX_MONOTONIC_SECONDS);
+  nanos_ = ts.tv_sec;
+  nanos_ *= MonoTime::kNanosecondsPerSecond;
+  nanos_ += ts.tv_nsec;
+}
+
+MonoTime::MonoTime(int64_t nanos)
+  : nanos_(nanos) {
+}
+
+double MonoTime::ToSeconds() const {
+  double d(nanos_);
+  d /= MonoTime::kNanosecondsPerSecond;
+  return d;
+}
+
+void SleepFor(const MonoDelta& delta) {
+  ThreadRestrictions::AssertWaitAllowed();
+  base::SleepForNanoseconds(delta.ToNanoseconds());
+}
+
+bool operator==(const MonoDelta &lhs, const MonoDelta &rhs) {
+  return lhs.Equals(rhs);
+}
+
+bool operator!=(const MonoDelta &lhs, const MonoDelta &rhs) {
+  return !lhs.Equals(rhs);
+}
+
+bool operator<(const MonoDelta &lhs, const MonoDelta &rhs) {
+  return lhs.LessThan(rhs);
+}
+
+bool operator<=(const MonoDelta &lhs, const MonoDelta &rhs) {
+  return lhs.LessThan(rhs) || lhs.Equals(rhs);
+}
+
+bool operator>(const MonoDelta &lhs, const MonoDelta &rhs) {
+  return lhs.MoreThan(rhs);
+}
+
+bool operator>=(const MonoDelta &lhs, const MonoDelta &rhs) {
+  return lhs.MoreThan(rhs) || lhs.Equals(rhs);
+}
+
+bool operator==(const MonoTime& lhs, const MonoTime& rhs) {
+  return lhs.Equals(rhs);
+}
+
+bool operator!=(const MonoTime& lhs, const MonoTime& rhs) {
+  return !lhs.Equals(rhs);
+}
+
+bool operator<(const MonoTime& lhs, const MonoTime& rhs) {
+  return lhs.ComesBefore(rhs);
+}
+
+bool operator<=(const MonoTime& lhs, const MonoTime& rhs) {
+  return lhs.ComesBefore(rhs) || lhs.Equals(rhs);
+}
+
+bool operator>(const MonoTime& lhs, const MonoTime& rhs) {
+  return rhs.ComesBefore(lhs);
+}
+
+bool operator>=(const MonoTime& lhs, const MonoTime& rhs) {
+  return rhs.ComesBefore(lhs) || rhs.Equals(lhs);
+}
+
+MonoTime operator+(const MonoTime& t, const MonoDelta& delta) {
+  MonoTime tmp(t);
+  tmp.AddDelta(delta);
+  return tmp;
+}
+
+MonoTime operator-(const MonoTime& t, const MonoDelta& delta) {
+  MonoTime tmp(t);
+  tmp.AddDelta(MonoDelta::FromNanoseconds(-delta.ToNanoseconds()));
+  return tmp;
+}
+
+MonoDelta operator-(const MonoTime& t_end, const MonoTime& t_beg) {
+  return t_end.GetDeltaSince(t_beg);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/monotime.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/monotime.h b/be/src/kudu/util/monotime.h
new file mode 100644
index 0000000..007c54d
--- /dev/null
+++ b/be/src/kudu/util/monotime.h
@@ -0,0 +1,412 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_MONOTIME_H
+#define KUDU_UTIL_MONOTIME_H
+
+#include <stdint.h>
+#include <string>
+
+#ifdef KUDU_HEADERS_NO_STUBS
+#include <gtest/gtest_prod.h>
+#else
+// This is a poor module interdependency, but the stubs are header-only and
+// it's only for exported header builds, so we'll make an exception.
+#include "kudu/client/stubs.h"
+#endif
+
+#include "kudu/util/kudu_export.h"
+
+struct timeval;
+struct timespec;
+
+namespace kudu {
+class MonoTime;
+
+/// @brief A representation of a time interval.
+///
+/// The MonoDelta class represents an elapsed duration of time -- i.e.
+/// the delta between two MonoTime instances.
+class KUDU_EXPORT MonoDelta {
+ public:
+  /// @name Converters from seconds representation (and ubiquitous SI 
prefixes).
+  ///
+  /// @param [in] seconds/ms/us/ns
+  ///   Time interval representation in seconds (with ubiquitous SI prefixes).
+  /// @return The resulting MonoDelta object initialized in accordance with
+  ///   the specified parameter.
+  ///
+  ///@{
+  static MonoDelta FromSeconds(double seconds);
+  static MonoDelta FromMilliseconds(int64_t ms);
+  static MonoDelta FromMicroseconds(int64_t us);
+  static MonoDelta FromNanoseconds(int64_t ns);
+  ///@}
+
+  /// Build a MonoDelta object.
+  ///
+  /// @note A MonoDelta instance built with the this default constructor is
+  ///   "uninitialized" and may not be used for any operation.
+  MonoDelta();
+
+  /// @return @c true iff this object is initialized.
+  bool Initialized() const;
+
+  /// Check whether this time interval is shorter than the specified one.
+  ///
+  /// @param [in] rhs
+  ///   A time interval for comparison.
+  /// @return @c true iff this time interval is strictly shorter
+  ///   than the specified one.
+  bool LessThan(const MonoDelta &rhs) const;
+
+  /// Check whether this time interval is longer than the specified one.
+  ///
+  /// @param [in] rhs
+  ///   A time interval for comparison.
+  /// @return @c true iff this time interval is strictly longer
+  ///   than the specified one.
+  bool MoreThan(const MonoDelta &rhs) const;
+
+  /// Check whether this time interval has the same duration
+  ///  as the specified one.
+  ///
+  /// @param [in] rhs
+  ///   A time interval for comparison.
+  /// @return @c true iff this time interval has the same duration as the
+  ///   the specified one.
+  bool Equals(const MonoDelta &rhs) const;
+
+  /// @return String representation of this interval's duration (in seconds).
+  std::string ToString() const;
+
+  /// @name Converters into seconds representation (and ubiquitous SI 
prefixes).
+  ///
+  /// @return Representation of the time interval in appropriate SI units.
+  ///
+  ///@{
+  double ToSeconds() const;
+  int64_t ToMilliseconds() const;
+  int64_t ToMicroseconds() const;
+  int64_t ToNanoseconds() const;
+  ///@}
+
+  /// Represent this time interval as a timeval structure, with microsecond
+  /// accuracy.
+  ///
+  /// @param [out] tv
+  ///   Placeholder for the result value.
+  void ToTimeVal(struct timeval *tv) const;
+
+  /// Represent this time interval as a timespec structure, with nanosecond
+  /// accuracy.
+  ///
+  /// @param [out] ts
+  ///   Placeholder for the result value.
+  void ToTimeSpec(struct timespec *ts) const;
+
+  /// Convert a nanosecond value to a timespec.
+  ///
+  /// @param [in] nanos
+  ///   Representation of a relative point in time in nanoseconds.
+  /// @param [out] ts
+  ///   Placeholder for the resulting timespec representation.
+  static void NanosToTimeSpec(int64_t nanos, struct timespec* ts);
+
+ private:
+  static const int64_t kUninitialized;
+
+  friend class MonoTime;
+  FRIEND_TEST(TestMonoTime, TestDeltaConversions);
+
+  explicit MonoDelta(int64_t delta);
+  int64_t nano_delta_;
+};
+
+/// @brief Representation of a particular point in time.
+///
+/// The MonoTime class represents a particular point in time,
+/// relative to some fixed but unspecified reference point.
+///
+/// This time is monotonic, meaning that if the user changes his or her system
+/// clock, the monotime does not change.
+class KUDU_EXPORT MonoTime {
+ public:
+  /// @name Conversion constants for ubiquitous time units.
+  ///
+  ///@{
+  static const int64_t kNanosecondsPerSecond = 1000000000L;
+  static const int64_t kNanosecondsPerMillisecond = 1000000L;
+  static const int64_t kNanosecondsPerMicrosecond = 1000L;
+
+  static const int64_t kMicrosecondsPerSecond = 1000000L;
+  ///@}
+
+  /// Get current time in MonoTime representation.
+  ///
+  /// @return Time specification for the moment of the method's invocation.
+  static MonoTime Now();
+
+  /// @return MonoTime equal to farthest possible time into the future.
+  static MonoTime Max();
+
+  /// @return MonoTime equal to farthest possible time into the past.
+  static MonoTime Min();
+
+  /// Select the earliest between the specified time points.
+  ///
+  /// @param [in] a
+  ///   The first MonoTime object to select from.
+  /// @param [in] b
+  ///   The second MonoTime object to select from.
+  /// @return The earliest (minimum) of the two monotimes.
+  static const MonoTime& Earliest(const MonoTime& a, const MonoTime& b);
+
+  /// Build a MonoTime object. The resulting object is not initialized
+  /// and not ready to use.
+  MonoTime();
+
+  /// @return @c true iff the object is initialized.
+  bool Initialized() const;
+
+  /// Compute time interval between the point in time specified by this
+  /// and the specified object.
+  ///
+  /// @param [in] rhs
+  ///   The object that corresponds to the left boundary of the time interval,
+  ///   where this object corresponds to the right boundary of the interval.
+  /// @return The resulting time interval represented as a MonoDelta object.
+  MonoDelta GetDeltaSince(const MonoTime &rhs) const;
+
+  /// Advance this object's time specification by the specified interval.
+  ///
+  /// @param [in] delta
+  ///   The time interval to add.
+  void AddDelta(const MonoDelta &delta);
+
+  /// Check whether the point in time specified by this object is earlier
+  /// than the specified one.
+  ///
+  /// @param [in] rhs
+  ///   The other MonoTime object to compare with.
+  /// @return @c true iff the point in time represented by this MonoTime object
+  ///   is earlier then the point in time represented by the parameter.
+  bool ComesBefore(const MonoTime &rhs) const;
+
+  /// @return String representation of the object (in seconds).
+  std::string ToString() const;
+
+  /// Check whether this object represents the same point in time as the other.
+  ///
+  /// @param [in] other
+  ///   The other MonoTime object to compare.
+  /// @return @c true iff the point in time represented by this MonoTime object
+  ///   is the same as the one represented by the other.
+  bool Equals(const MonoTime& other) const;
+
+  /// @name Syntactic sugar: increment/decrement operators for MonoTime.
+  ///@{
+  ///
+  /// Add a delta to the point in time represented by the object.
+  ///
+  /// @param [in] delta
+  ///   The delta to add.
+  /// @return Reference to the modified object.
+  MonoTime& operator+=(const MonoDelta& delta);
+
+  /// Substract a delta from the point in time represented by the object.
+  ///
+  /// @param [in] delta
+  ///   The delta to substract.
+  /// @return Reference to the modified object.
+  MonoTime& operator-=(const MonoDelta& delta);
+  ///@}
+
+ private:
+  friend class MonoDelta;
+  FRIEND_TEST(TestMonoTime, TestTimeSpec);
+  FRIEND_TEST(TestMonoTime, TestDeltaConversions);
+
+  explicit MonoTime(const struct timespec &ts);
+  explicit MonoTime(int64_t nanos);
+  double ToSeconds() const;
+  int64_t nanos_;
+};
+
+/// Sleep for an interval specified by a MonoDelta instance.
+///
+/// This is preferred over sleep(3), usleep(3), and nanosleep(3).
+/// It's less prone to mixups with units since it uses the MonoDelta for
+/// interval specification.
+/// Besides, it ignores signals/EINTR, so will reliably sleep at least for the
+/// MonoDelta duration.
+///
+/// @param [in] delta
+///   The time interval to sleep for.
+void KUDU_EXPORT SleepFor(const MonoDelta& delta);
+
+/// @name Syntactic sugar: binary operators for MonoDelta.
+///@{
+///
+/// @param [in] lhs
+///   A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+///   A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is equal
+///   to the time interval represented by @c rhs.
+bool KUDU_EXPORT operator==(const MonoDelta &lhs, const MonoDelta &rhs);
+
+/// @param [in] lhs
+///   A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+///   A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is not equal
+///   to the time interval represented by @c rhs.
+bool KUDU_EXPORT operator!=(const MonoDelta &lhs, const MonoDelta &rhs);
+
+/// @param [in] lhs
+///   A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+///   A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is shorter
+///   than the time interval represented by @c rhs.
+bool KUDU_EXPORT operator<(const MonoDelta &lhs, const MonoDelta &rhs);
+
+/// @param [in] lhs
+///   A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+///   A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is shorter
+///   than or equal to the time interval represented by @c rhs.
+bool KUDU_EXPORT operator<=(const MonoDelta &lhs, const MonoDelta &rhs);
+
+/// @param [in] lhs
+///   A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+///   A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is longer
+///   than the time interval represented by @c rhs.
+bool KUDU_EXPORT operator>(const MonoDelta &lhs, const MonoDelta &rhs);
+
+/// @param [in] lhs
+///   A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+///   A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is longer
+///   than or equal to the time interval represented by @c rhs.
+bool KUDU_EXPORT operator>=(const MonoDelta &lhs, const MonoDelta &rhs);
+///@}
+
+/// @name Syntactic sugar: binary operators for MonoTime.
+///@{
+///
+/// Check if the specified objects represent the same point in time.
+///
+/// This is a handy operator which is semantically equivalent to
+/// MonoTime::Equals().
+///
+/// @param [in] lhs
+///   The left-hand operand.
+/// @param [in] rhs
+///   The right-hand operand.
+/// @return @c true iff the given objects represent the same point in time.
+bool KUDU_EXPORT operator==(const MonoTime& lhs, const MonoTime& rhs);
+
+/// Check if the specified objects represent different points in time.
+///
+/// This is a handy operator which is semantically equivalent to the negation 
of
+/// MonoTime::Equals().
+///
+/// @param [in] lhs
+///   The left-hand operand.
+/// @param [in] rhs
+///   The right-hand operand.
+/// @return @c true iff the given object represents a different point in time
+///   than the specified one.
+bool KUDU_EXPORT operator!=(const MonoTime& lhs, const MonoTime& rhs);
+
+/// @param [in] lhs
+///   The left-hand operand.
+/// @param [in] rhs
+///   The right-hand operand.
+/// @return @c true iff the @c lhs object represents an earlier point in time
+///   than the @c rhs object.
+bool KUDU_EXPORT operator<(const MonoTime& lhs, const MonoTime& rhs);
+
+/// @param [in] lhs
+///   The left-hand operand.
+/// @param [in] rhs
+///   The right-hand operand.
+/// @return @c true iff the @c lhs object represents an earlier than or
+///   the same point in time as the @c rhs object.
+bool KUDU_EXPORT operator<=(const MonoTime& lhs, const MonoTime& rhs);
+
+/// @param [in] lhs
+///   The left-hand operand.
+/// @param [in] rhs
+///   The right-hand operand.
+/// @return @c true iff the @c lhs object represents a later point in time
+///   than the @c rhs object.
+bool KUDU_EXPORT operator>(const MonoTime& lhs, const MonoTime& rhs);
+
+/// @param [in] lhs
+///   The left-hand operand.
+/// @param [in] rhs
+///   The right-hand operand.
+/// @return @c true iff the @c lhs object represents a later than or
+///   the same point in time as the @c rhs object.
+bool KUDU_EXPORT operator>=(const MonoTime& lhs, const MonoTime& rhs);
+///@}
+
+/// @name Syntactic sugar: mixed binary operators for MonoTime/MonoDelta.
+///@{
+///
+/// Add the specified time interval to the given point in time.
+///
+/// @param [in] t
+///   A MonoTime object representing the given point in time.
+/// @param [in] delta
+///   A MonoDelta object representing the specified time interval.
+/// @return A MonoTime object representing the resulting point in time.
+MonoTime KUDU_EXPORT operator+(const MonoTime& t, const MonoDelta& delta);
+
+/// Subtract the specified time interval from the given point in time.
+///
+/// @param [in] t
+///   A MonoTime object representing the given point in time.
+/// @param [in] delta
+///   A MonoDelta object representing the specified time interval.
+/// @return A MonoTime object representing the resulting point in time.
+MonoTime KUDU_EXPORT operator-(const MonoTime& t, const MonoDelta& delta);
+
+/// Compute the time interval between the specified points in time.
+///
+/// Semantically, this is equivalent to t0.GetDeltaSince(t1).
+///
+/// @param [in] t_end
+///   The second point in time.  Semantically corresponds to the end
+///   of the resulting time interval.
+/// @param [in] t_beg
+///   The first point in time.  Semantically corresponds to the beginning
+///   of the resulting time interval.
+/// @return A MonoDelta object representing the time interval between the
+///   specified points in time.
+MonoDelta KUDU_EXPORT operator-(const MonoTime& t_end, const MonoTime& 
t_begin);
+///@}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mt-hdr_histogram-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mt-hdr_histogram-test.cc 
b/be/src/kudu/util/mt-hdr_histogram-test.cc
new file mode 100644
index 0000000..879c5e3
--- /dev/null
+++ b/be/src/kudu/util/mt-hdr_histogram-test.cc
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/hdr_histogram.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+DEFINE_int32(histogram_test_num_threads, 16,
+    "Number of threads to spawn for mt-hdr_histogram test");
+DEFINE_uint64(histogram_test_num_increments_per_thread, 100000LU,
+    "Number of times to call Increment() per thread in mt-hdr_histogram test");
+
+using std::vector;
+
+namespace kudu {
+
+class MtHdrHistogramTest : public KuduTest {
+ public:
+  MtHdrHistogramTest() {
+    num_threads_ = FLAGS_histogram_test_num_threads;
+    num_times_ = FLAGS_histogram_test_num_increments_per_thread;
+  }
+
+ protected:
+  int num_threads_;
+  uint64_t num_times_;
+};
+
+// Increment a counter a bunch of times in the same bucket
+static void IncrementSameHistValue(HdrHistogram* hist, uint64_t value, 
uint64_t times) {
+  for (uint64_t i = 0; i < times; i++) {
+    hist->Increment(value);
+  }
+}
+
+TEST_F(MtHdrHistogramTest, ConcurrentWriteTest) {
+  const uint64_t kValue = 1LU;
+
+  HdrHistogram hist(100000LU, 3);
+
+  auto threads = new scoped_refptr<kudu::Thread>[num_threads_];
+  for (int i = 0; i < num_threads_; i++) {
+    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("thread-$0", i),
+        IncrementSameHistValue, &hist, kValue, num_times_, &threads[i]));
+  }
+  for (int i = 0; i < num_threads_; i++) {
+    CHECK_OK(ThreadJoiner(threads[i].get()).Join());
+  }
+
+  HdrHistogram snapshot(hist);
+  ASSERT_EQ(num_threads_ * num_times_, snapshot.CountInBucketForValue(kValue));
+
+  delete[] threads;
+}
+
+// Copy while writing, then iterate to ensure copies are consistent.
+TEST_F(MtHdrHistogramTest, ConcurrentCopyWhileWritingTest) {
+  const int kNumCopies = 10;
+  const uint64_t kValue = 1;
+
+  HdrHistogram hist(100000LU, 3);
+
+  auto threads = new scoped_refptr<kudu::Thread>[num_threads_];
+  for (int i = 0; i < num_threads_; i++) {
+    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("thread-$0", i),
+        IncrementSameHistValue, &hist, kValue, num_times_, &threads[i]));
+  }
+
+  // This is somewhat racy but the goal is to catch this issue at least
+  // most of the time. At the time of this writing, before fixing a bug where
+  // the total count stored in a copied histogram may not match its internal
+  // counts (under concurrent writes), this test fails for me on 100/100 runs.
+  vector<HdrHistogram *> snapshots;
+  ElementDeleter deleter(&snapshots);
+  for (int i = 0; i < kNumCopies; i++) {
+    snapshots.push_back(new HdrHistogram(hist));
+    SleepFor(MonoDelta::FromMicroseconds(100));
+  }
+  for (int i = 0; i < kNumCopies; i++) {
+    snapshots[i]->MeanValue(); // Will crash if underlying iterator is 
inconsistent.
+  }
+
+  for (int i = 0; i < num_threads_; i++) {
+    CHECK_OK(ThreadJoiner(threads[i].get()).Join());
+  }
+
+  delete[] threads;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mt-metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mt-metrics-test.cc 
b/be/src/kudu/util/mt-metrics-test.cc
new file mode 100644
index 0000000..b4512fb
--- /dev/null
+++ b/be/src/kudu/util/mt-metrics-test.cc
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/jsonwriter.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+DEFINE_int32(mt_metrics_test_num_threads, 4,
+             "Number of threads to spawn in mt metrics tests");
+
+METRIC_DEFINE_entity(test_entity);
+
+namespace kudu {
+
+using debug::ScopedLeakCheckDisabler;
+using std::vector;
+
+class MultiThreadedMetricsTest : public KuduTest {
+ public:
+  static void RegisterCounters(const scoped_refptr<MetricEntity>& 
metric_entity,
+                               const string& name_prefix, int num_counters);
+
+  MetricRegistry registry_;
+};
+
+// Call increment on a Counter a bunch of times.
+static void CountWithCounter(scoped_refptr<Counter> counter, int 
num_increments) {
+  for (int i = 0; i < num_increments; i++) {
+    counter->Increment();
+  }
+}
+
+// Helper function that spawns and then joins a bunch of threads.
+static void RunWithManyThreads(boost::function<void()>* f, int num_threads) {
+  vector<scoped_refptr<kudu::Thread> > threads;
+  for (int i = 0; i < num_threads; i++) {
+    scoped_refptr<kudu::Thread> new_thread;
+    CHECK_OK(kudu::Thread::Create("test", StringPrintf("thread%d", i),
+          *f, &new_thread));
+    threads.push_back(new_thread);
+  }
+  for (int i = 0; i < num_threads; i++) {
+    ASSERT_OK(ThreadJoiner(threads[i].get()).Join());
+  }
+}
+
+METRIC_DEFINE_counter(test_entity, test_counter, "Test Counter",
+                      MetricUnit::kRequests, "Test counter");
+
+// Ensure that incrementing a counter is thread-safe.
+TEST_F(MultiThreadedMetricsTest, CounterIncrementTest) {
+  scoped_refptr<Counter> counter = new Counter(&METRIC_test_counter);
+  int num_threads = FLAGS_mt_metrics_test_num_threads;
+  int num_increments = 1000;
+  boost::function<void()> f =
+      boost::bind(CountWithCounter, counter, num_increments);
+  RunWithManyThreads(&f, num_threads);
+  ASSERT_EQ(num_threads * num_increments, counter->value());
+}
+
+// Helper function to register a bunch of counters in a loop.
+void MultiThreadedMetricsTest::RegisterCounters(
+    const scoped_refptr<MetricEntity>& metric_entity,
+    const string& name_prefix,
+    int num_counters) {
+  uint64_t tid = Env::Default()->gettid();
+  for (int i = 0; i < num_counters; i++) {
+    // This loop purposefully leaks metrics prototypes, because the metrics 
system
+    // expects the prototypes and their names to live forever. This is the only
+    // place we dynamically generate them for the purposes of a test, so it's 
easier
+    // to just leak them than to figure out a way to manage lifecycle of 
objects that
+    // are typically static.
+    ScopedLeakCheckDisabler disabler;
+
+    string name = strings::Substitute("$0-$1-$2", name_prefix, tid, i);
+    auto proto = new CounterPrototype(MetricPrototype::CtorArgs(
+        "test_entity", strdup(name.c_str()), "Test Counter",
+        MetricUnit::kOperations, "test counter"));
+    proto->Instantiate(metric_entity)->Increment();
+  }
+}
+
+// Ensure that adding a counter to a registry is thread-safe.
+TEST_F(MultiThreadedMetricsTest, AddCounterToRegistryTest) {
+  scoped_refptr<MetricEntity> entity = 
METRIC_ENTITY_test_entity.Instantiate(&registry_, "my-test");
+  int num_threads = FLAGS_mt_metrics_test_num_threads;
+  int num_counters = 1000;
+  boost::function<void()> f =
+      boost::bind(RegisterCounters, entity, "prefix", num_counters);
+  RunWithManyThreads(&f, num_threads);
+  ASSERT_EQ(num_threads * num_counters, 
entity->UnsafeMetricsMapForTests().size());
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mt-threadlocal-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mt-threadlocal-test.cc 
b/be/src/kudu/util/mt-threadlocal-test.cc
new file mode 100644
index 0000000..2390b82
--- /dev/null
+++ b/be/src/kudu/util/mt-threadlocal-test.cc
@@ -0,0 +1,349 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <mutex>
+#include <string>
+#include <unordered_set>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadlocal.h"
+#include "kudu/util/threadlocal_cache.h"
+
+using std::string;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace threadlocal {
+
+class ThreadLocalTest : public KuduTest {};
+
+const int kTargetCounterVal = 1000000;
+
+class Counter;
+typedef unordered_set<Counter*> CounterPtrSet;
+typedef Mutex RegistryLockType;
+typedef simple_spinlock CounterLockType;
+
+// Registry to provide reader access to the thread-local Counters.
+// The methods are only thread-safe if the calling thread holds the lock.
+class CounterRegistry {
+ public:
+  CounterRegistry() {
+  }
+
+  RegistryLockType* get_lock() const {
+    return &lock_;
+  }
+
+  bool RegisterUnlocked(Counter* counter) {
+    LOG(INFO) << "Called RegisterUnlocked()";
+    return InsertIfNotPresent(&counters_, counter);
+  }
+
+  bool UnregisterUnlocked(Counter* counter) {
+    LOG(INFO) << "Called UnregisterUnlocked()";
+    return counters_.erase(counter) > 0;
+  }
+
+  CounterPtrSet* GetCountersUnlocked() {
+    return &counters_;
+  }
+
+ private:
+  mutable RegistryLockType lock_;
+  CounterPtrSet counters_;
+  DISALLOW_COPY_AND_ASSIGN(CounterRegistry);
+};
+
+// A simple Counter class that registers itself with a CounterRegistry.
+class Counter {
+ public:
+  Counter(CounterRegistry* registry, int val)
+    : tid_(Env::Default()->gettid()),
+      registry_(CHECK_NOTNULL(registry)),
+      val_(val) {
+    LOG(INFO) << "Counter::~Counter(): tid = " << tid_ << ", addr = " << this 
<< ", val = " << val_;
+    std::lock_guard<RegistryLockType> reg_lock(*registry_->get_lock());
+    CHECK(registry_->RegisterUnlocked(this));
+  }
+
+  ~Counter() {
+    LOG(INFO) << "Counter::~Counter(): tid = " << tid_ << ", addr = " << this 
<< ", val = " << val_;
+    std::lock_guard<RegistryLockType> reg_lock(*registry_->get_lock());
+    std::lock_guard<CounterLockType> self_lock(lock_);
+    LOG(INFO) << tid_ << ": deleting self from registry...";
+    CHECK(registry_->UnregisterUnlocked(this));
+  }
+
+  uint64_t tid() {
+    return tid_;
+  }
+
+  CounterLockType* get_lock() const {
+    return &lock_;
+  }
+
+  void IncrementUnlocked() {
+    val_++;
+  }
+
+  int GetValueUnlocked() {
+    return val_;
+  }
+
+ private:
+  // We expect that most of the time this lock will be uncontended.
+  mutable CounterLockType lock_;
+
+  // TID of thread that constructed this object.
+  const uint64_t tid_;
+
+  // Register / unregister ourselves with this on construction / destruction.
+  CounterRegistry* const registry_;
+
+  // Current value of the counter.
+  int val_;
+
+  DISALLOW_COPY_AND_ASSIGN(Counter);
+};
+
+// Create a new THREAD_LOCAL Counter and loop an increment operation on it.
+static void RegisterCounterAndLoopIncr(CounterRegistry* registry,
+                                       CountDownLatch* counters_ready,
+                                       CountDownLatch* reader_ready,
+                                       CountDownLatch* counters_done,
+                                       CountDownLatch* reader_done) {
+  BLOCK_STATIC_THREAD_LOCAL(Counter, counter, registry, 0);
+  // Inform the reader that we are alive.
+  counters_ready->CountDown();
+  // Let the reader initialize before we start counting.
+  reader_ready->Wait();
+  // Now rock & roll on the counting loop.
+  for (int i = 0; i < kTargetCounterVal; i++) {
+    std::lock_guard<CounterLockType> l(*counter->get_lock());
+    counter->IncrementUnlocked();
+  }
+  // Let the reader know we're ready for him to verify our counts.
+  counters_done->CountDown();
+  // Wait until the reader is done before we exit the thread, which will call
+  // delete on the Counter.
+  reader_done->Wait();
+}
+
+// Iterate over the registered counters and their values.
+static uint64_t Iterate(CounterRegistry* registry, int expected_counters) {
+  uint64_t sum = 0;
+  int seen_counters = 0;
+  std::lock_guard<RegistryLockType> l(*registry->get_lock());
+  for (Counter* counter : *registry->GetCountersUnlocked()) {
+    uint64_t value;
+    {
+      std::lock_guard<CounterLockType> l(*counter->get_lock());
+      value = counter->GetValueUnlocked();
+    }
+    LOG(INFO) << "tid " << counter->tid() << " (counter " << counter << "): " 
<< value;
+    sum += value;
+    seen_counters++;
+  }
+  CHECK_EQ(expected_counters, seen_counters);
+  return sum;
+}
+
+static void TestThreadLocalCounters(CounterRegistry* registry, const int 
num_threads) {
+  LOG(INFO) << "Starting threads...";
+  vector<scoped_refptr<kudu::Thread> > threads;
+
+  CountDownLatch counters_ready(num_threads);
+  CountDownLatch reader_ready(1);
+  CountDownLatch counters_done(num_threads);
+  CountDownLatch reader_done(1);
+  for (int i = 0; i < num_threads; i++) {
+    scoped_refptr<kudu::Thread> new_thread;
+    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+        &RegisterCounterAndLoopIncr, registry, &counters_ready, &reader_ready,
+        &counters_done, &reader_done, &new_thread));
+    threads.push_back(new_thread);
+  }
+
+  // Wait for all threads to start and register their Counters.
+  counters_ready.Wait();
+  CHECK_EQ(0, Iterate(registry, num_threads));
+  LOG(INFO) << "--";
+
+  // Let the counters start spinning.
+  reader_ready.CountDown();
+
+  // Try to catch them in the act, just for kicks.
+  for (int i = 0; i < 2; i++) {
+    Iterate(registry, num_threads);
+    LOG(INFO) << "--";
+    SleepFor(MonoDelta::FromMicroseconds(1));
+  }
+
+  // Wait until they're done and assure they sum up properly.
+  counters_done.Wait();
+  LOG(INFO) << "Checking Counter sums...";
+  CHECK_EQ(kTargetCounterVal * num_threads, Iterate(registry, num_threads));
+  LOG(INFO) << "Counter sums add up!";
+  reader_done.CountDown();
+
+  LOG(INFO) << "Joining & deleting threads...";
+  for (scoped_refptr<kudu::Thread> thread : threads) {
+    CHECK_OK(ThreadJoiner(thread.get()).Join());
+  }
+  LOG(INFO) << "Done.";
+}
+
+TEST_F(ThreadLocalTest, TestConcurrentCounters) {
+  // Run this multiple times to ensure we don't leave remnants behind in the
+  // CounterRegistry.
+  CounterRegistry registry;
+  for (int i = 0; i < 3; i++) {
+    TestThreadLocalCounters(&registry, 8);
+  }
+}
+
+// Test class that stores a string in a static thread local member.
+// This class cannot be instantiated. The methods are all static.
+class ThreadLocalString {
+ public:
+  static void set(std::string value);
+  static const std::string& get();
+ private:
+  ThreadLocalString() {
+  }
+  DECLARE_STATIC_THREAD_LOCAL(std::string, value_);
+  DISALLOW_COPY_AND_ASSIGN(ThreadLocalString);
+};
+
+DEFINE_STATIC_THREAD_LOCAL(std::string, ThreadLocalString, value_);
+
+void ThreadLocalString::set(std::string value) {
+  INIT_STATIC_THREAD_LOCAL(std::string, value_);
+  *value_ = value;
+}
+
+const std::string& ThreadLocalString::get() {
+  INIT_STATIC_THREAD_LOCAL(std::string, value_);
+  return *value_;
+}
+
+static void RunAndAssign(CountDownLatch* writers_ready,
+                         CountDownLatch *readers_ready,
+                         CountDownLatch *all_done,
+                         CountDownLatch *threads_exiting,
+                         const std::string& in,
+                         std::string* out) {
+  writers_ready->Wait();
+  // Ensure it starts off as an empty string.
+  CHECK_EQ("", ThreadLocalString::get());
+  ThreadLocalString::set(in);
+
+  readers_ready->Wait();
+  out->assign(ThreadLocalString::get());
+  all_done->Wait();
+  threads_exiting->CountDown();
+}
+
+TEST_F(ThreadLocalTest, TestTLSMember) {
+  const int num_threads = 8;
+
+  vector<CountDownLatch*> writers_ready;
+  vector<CountDownLatch*> readers_ready;
+  vector<std::string*> out_strings;
+  vector<scoped_refptr<kudu::Thread> > threads;
+
+  ElementDeleter writers_deleter(&writers_ready);
+  ElementDeleter readers_deleter(&readers_ready);
+  ElementDeleter out_strings_deleter(&out_strings);
+
+  CountDownLatch all_done(1);
+  CountDownLatch threads_exiting(num_threads);
+
+  LOG(INFO) << "Starting threads...";
+  for (int i = 0; i < num_threads; i++) {
+    writers_ready.push_back(new CountDownLatch(1));
+    readers_ready.push_back(new CountDownLatch(1));
+    out_strings.push_back(new std::string());
+    scoped_refptr<kudu::Thread> new_thread;
+    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+        &RunAndAssign, writers_ready[i], readers_ready[i],
+        &all_done, &threads_exiting, Substitute("$0", i), out_strings[i], 
&new_thread));
+    threads.push_back(new_thread);
+  }
+
+  // Unlatch the threads in order.
+  LOG(INFO) << "Writing to thread locals...";
+  for (int i = 0; i < num_threads; i++) {
+    writers_ready[i]->CountDown();
+  }
+  LOG(INFO) << "Reading from thread locals...";
+  for (int i = 0; i < num_threads; i++) {
+    readers_ready[i]->CountDown();
+  }
+  all_done.CountDown();
+  // threads_exiting acts as a memory barrier.
+  threads_exiting.Wait();
+  for (int i = 0; i < num_threads; i++) {
+    ASSERT_EQ(Substitute("$0", i), *out_strings[i]);
+    LOG(INFO) << "Read " << *out_strings[i];
+  }
+
+  LOG(INFO) << "Joining & deleting threads...";
+  for (scoped_refptr<kudu::Thread> thread : threads) {
+    CHECK_OK(ThreadJoiner(thread.get()).Join());
+  }
+}
+
+TEST_F(ThreadLocalTest, TestThreadLocalCache) {
+  using TLC = ThreadLocalCache<int, string>;
+  TLC* tlc = TLC::GetInstance();
+
+  // Lookup in an empty cache should return nullptr.
+  ASSERT_EQ(nullptr, tlc->Lookup(0));
+
+  // Insert more items than the cache capacity.
+  const int kLastItem = TLC::kItemCapacity * 2;
+  for (int i = 1; i <= kLastItem ; i++) {
+    auto* item = tlc->EmplaceNew(i);
+    ASSERT_NE(nullptr, item);
+    *item = Substitute("item $0", i);
+  }
+
+  // Looking up the most recent items should return them.
+  string* item = tlc->Lookup(kLastItem);
+  ASSERT_NE(nullptr, item);
+  EXPECT_EQ(*item, Substitute("item $0", kLastItem));
+
+  // Looking up evicted items should return nullptr.
+  ASSERT_EQ(nullptr, tlc->Lookup(1));
+}
+
+} // namespace threadlocal
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mutex.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mutex.cc b/be/src/kudu/util/mutex.cc
new file mode 100644
index 0000000..bd9539b
--- /dev/null
+++ b/be/src/kudu/util/mutex.cc
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Portions (c) 2011 The Chromium Authors.
+
+#include "kudu/util/mutex.h"
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/trace.h"
+
+using std::string;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
+
+#ifndef NDEBUG
+DEFINE_bool(debug_mutex_collect_stacktrace, false,
+            "Whether to collect a stacktrace on Mutex contention in a DEBUG 
build");
+TAG_FLAG(debug_mutex_collect_stacktrace, advanced);
+TAG_FLAG(debug_mutex_collect_stacktrace, hidden);
+#endif
+
+namespace kudu {
+
+Mutex::Mutex()
+#ifndef NDEBUG
+  : owning_tid_(0),
+    stack_trace_(new StackTrace())
+#endif
+{
+#ifndef NDEBUG
+  // In debug, setup attributes for lock error checking.
+  pthread_mutexattr_t mta;
+  int rv = pthread_mutexattr_init(&mta);
+  DCHECK_EQ(0, rv) << ". " << strerror(rv);
+  rv = pthread_mutexattr_settype(&mta, PTHREAD_MUTEX_ERRORCHECK);
+  DCHECK_EQ(0, rv) << ". " << strerror(rv);
+  rv = pthread_mutex_init(&native_handle_, &mta);
+  DCHECK_EQ(0, rv) << ". " << strerror(rv);
+  rv = pthread_mutexattr_destroy(&mta);
+  DCHECK_EQ(0, rv) << ". " << strerror(rv);
+#else
+  // In release, go with the default lock attributes.
+  pthread_mutex_init(&native_handle_, NULL);
+#endif
+}
+
+Mutex::~Mutex() {
+  int rv = pthread_mutex_destroy(&native_handle_);
+  DCHECK_EQ(0, rv) << ". " << strerror(rv);
+}
+
+bool Mutex::TryAcquire() {
+  int rv = pthread_mutex_trylock(&native_handle_);
+#ifndef NDEBUG
+  DCHECK(rv == 0 || rv == EBUSY) << ". " << strerror(rv) << ". " << 
GetOwnerThreadInfo();
+  if (rv == 0) {
+    CheckUnheldAndMark();
+  }
+#endif
+  return rv == 0;
+}
+
+void Mutex::Acquire() {
+  // Optimize for the case when mutexes are uncontended. If they
+  // are contended, we'll have to go to sleep anyway, so the extra
+  // cost of branch mispredictions is moot.
+  //
+  // TryAcquire() is implemented as a simple CompareAndSwap inside
+  // pthreads so this does not require a system call.
+  if (PREDICT_TRUE(TryAcquire())) {
+    return;
+  }
+
+  // If we weren't able to acquire the mutex immediately, then it's
+  // worth gathering timing information about the mutex acquisition.
+  MicrosecondsInt64 start_time = GetMonoTimeMicros();
+  int rv = pthread_mutex_lock(&native_handle_);
+  DCHECK_EQ(0, rv) << ". " << strerror(rv)
+#ifndef NDEBUG
+                   << ". " << GetOwnerThreadInfo()
+#endif
+  ; // NOLINT(whitespace/semicolon)
+  MicrosecondsInt64 end_time = GetMonoTimeMicros();
+
+  int64_t wait_time = end_time - start_time;
+  if (wait_time > 0) {
+    TRACE_COUNTER_INCREMENT("mutex_wait_us", wait_time);
+  }
+
+#ifndef NDEBUG
+  CheckUnheldAndMark();
+#endif
+}
+
+void Mutex::Release() {
+#ifndef NDEBUG
+  CheckHeldAndUnmark();
+#endif
+  int rv = pthread_mutex_unlock(&native_handle_);
+  DCHECK_EQ(0, rv) << ". " << strerror(rv);
+}
+
+#ifndef NDEBUG
+void Mutex::AssertAcquired() const {
+  DCHECK_EQ(Env::Default()->gettid(), owning_tid_);
+}
+
+void Mutex::CheckHeldAndUnmark() {
+  AssertAcquired();
+  owning_tid_ = 0;
+  if (FLAGS_debug_mutex_collect_stacktrace) {
+    stack_trace_->Reset();
+  }
+}
+
+void Mutex::CheckUnheldAndMark() {
+  DCHECK_EQ(0, owning_tid_);
+  owning_tid_ = Env::Default()->gettid();
+  if (FLAGS_debug_mutex_collect_stacktrace) {
+    stack_trace_->Collect();
+  }
+}
+
+string Mutex::GetOwnerThreadInfo() const {
+  string str = Substitute("Owner tid: $0; Self tid: $1; ", owning_tid_, 
Env::Default()->gettid());
+  if (FLAGS_debug_mutex_collect_stacktrace) {
+    SubstituteAndAppend(&str, "Owner stack:\n$0", stack_trace_->Symbolize());
+  } else {
+    str += "To collect the owner stack trace, enable the flag 
--debug_mutex_collect_stacktrace";
+  }
+  return str;
+}
+
+#endif
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mutex.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mutex.h b/be/src/kudu/util/mutex.h
new file mode 100644
index 0000000..9277ac0
--- /dev/null
+++ b/be/src/kudu/util/mutex.h
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_MUTEX_H
+#define KUDU_UTIL_MUTEX_H
+
+#include <pthread.h>
+#include <sys/types.h>
+
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+
+class StackTrace;
+
+// A lock built around pthread_mutex_t. Does not allow recursion.
+//
+// The following checks will be performed in DEBUG mode:
+//   Acquire(), TryAcquire() - the lock isn't already held.
+//   Release() - the lock is already held by this thread.
+//
+class Mutex {
+ public:
+  Mutex();
+  ~Mutex();
+
+  void Acquire();
+  void Release();
+  bool TryAcquire();
+
+  void lock() { Acquire(); }
+  void unlock() { Release(); }
+  bool try_lock() { return TryAcquire(); }
+
+#ifndef NDEBUG
+  void AssertAcquired() const;
+#else
+  void AssertAcquired() const {}
+#endif
+
+ private:
+  friend class ConditionVariable;
+
+  pthread_mutex_t native_handle_;
+
+#ifndef NDEBUG
+  // Members and routines taking care of locks assertions.
+  void CheckHeldAndUnmark();
+  void CheckUnheldAndMark();
+  std::string GetOwnerThreadInfo() const;
+
+  // All private data is implicitly protected by native_handle_.
+  // Be VERY careful to only access members under that lock.
+  pid_t owning_tid_;
+  gscoped_ptr<StackTrace> stack_trace_;
+#endif
+
+  DISALLOW_COPY_AND_ASSIGN(Mutex);
+};
+
+// A helper class that acquires the given Lock while the MutexLock is in scope.
+class MutexLock {
+ public:
+  struct AlreadyAcquired {};
+
+  // Acquires 'lock' (must be unheld) and wraps around it.
+  //
+  // Sample usage:
+  // {
+  //   MutexLock l(lock_); // acquired
+  //   ...
+  // } // released
+  explicit MutexLock(Mutex& lock)
+    : lock_(&lock),
+      owned_(true) {
+    lock_->Acquire();
+  }
+
+  // Wraps around 'lock' (must already be held by this thread).
+  //
+  // Sample usage:
+  // {
+  //   lock_.Acquire(); // acquired
+  //   ...
+  //   MutexLock l(lock_, AlreadyAcquired());
+  //   ...
+  // } // released
+  MutexLock(Mutex& lock, const AlreadyAcquired&)
+    : lock_(&lock),
+      owned_(true) {
+    lock_->AssertAcquired();
+  }
+
+  void Lock() {
+    DCHECK(!owned_);
+    lock_->Acquire();
+    owned_ = true;
+  }
+
+  void Unlock() {
+    DCHECK(owned_);
+    lock_->AssertAcquired();
+    lock_->Release();
+    owned_ = false;
+  }
+
+  ~MutexLock() {
+    if (owned_) {
+      Unlock();
+    }
+  }
+
+  bool OwnsLock() const {
+    return owned_;
+  }
+
+ private:
+  Mutex* lock_;
+  bool owned_;
+  DISALLOW_COPY_AND_ASSIGN(MutexLock);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_MUTEX_H */

Reply via email to