http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/minidump.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/minidump.cc b/be/src/kudu/util/minidump.cc new file mode 100644 index 0000000..c088ae7 --- /dev/null +++ b/be/src/kudu/util/minidump.cc @@ -0,0 +1,378 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/minidump.h" + +#include <signal.h> +#include <sys/types.h> +#include <unistd.h> + +#include <atomic> +#include <memory> +#include <string> + +#if defined(__linux__) +#include <breakpad/client/linux/handler/exception_handler.h> +#include <breakpad/common/linux/linux_libc_support.h> +#endif // defined(__linux__) + +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/linux_syscall_support.h" +#include "kudu/gutil/strings/human_readable.h" +#include "kudu/util/errno.h" +#include "kudu/util/env.h" +#include "kudu/util/env_util.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/logging.h" +#include "kudu/util/path_util.h" +#include "kudu/util/status.h" + +using kudu::env_util::CreateDirIfMissing; +using std::string; + +#if defined(__linux__) +static constexpr bool kMinidumpPlatformSupported = true; +#else +static constexpr bool kMinidumpPlatformSupported = false; +#endif // defined(__linux__) + +DECLARE_string(log_dir); + +DEFINE_bool(enable_minidumps, kMinidumpPlatformSupported, + "Whether to enable minidump generation upon process crash or SIGUSR1. " + "Currently only supported on Linux systems."); +TAG_FLAG(enable_minidumps, advanced); +TAG_FLAG(enable_minidumps, evolving); +static bool ValidateMinidumpEnabled(const char* /*flagname*/, bool value) { + if (value && !kMinidumpPlatformSupported) { + return false; // NOLINT(*) + } + return true; +} +DEFINE_validator(enable_minidumps, &ValidateMinidumpEnabled); + +DEFINE_string(minidump_path, "minidumps", "Directory to write minidump files to. This " + "can be either an absolute path or a path relative to --log_dir. Each daemon will " + "create an additional sub-directory to prevent naming conflicts and to make it " + "easier to identify a crashing daemon. Minidump files contain crash-related " + "information in a compressed format. Minidumps will be written when a daemon exits " + "unexpectedly, for example on an unhandled exception or signal, or when a " + "SIGUSR1 signal is sent to the process. Cannot be set to an empty value."); +TAG_FLAG(minidump_path, evolving); +// The minidump path cannot be empty. +static bool ValidateMinidumpPath(const char* /*flagname*/, const string& value) { + return !value.empty(); +} +DEFINE_validator(minidump_path, &ValidateMinidumpPath); + +DEFINE_int32(max_minidumps, 9, "Maximum number of minidump files to keep per daemon. " + "Older files are removed first. Set to 0 to keep all minidump files."); +TAG_FLAG(max_minidumps, evolving); + +DEFINE_int32(minidump_size_limit_hint_kb, 20480, "Size limit hint for minidump files in " + "KB. If a minidump exceeds this value, then breakpad will reduce the stack memory it " + "collects for each thread from 8KB to 2KB. However it will always include the full " + "stack memory for the first 20 threads, including the thread that crashed."); +TAG_FLAG(minidump_size_limit_hint_kb, advanced); +TAG_FLAG(minidump_size_limit_hint_kb, evolving); + +#if !defined(__linux__) +namespace google_breakpad { +// Define this as an empty class to avoid an undefined symbol error on Mac. +class ExceptionHandler { + public: + ExceptionHandler() {} + ~ExceptionHandler() {} +}; +} // namespace google_breakpad +#endif // !defined(__linux__) + +namespace kudu { + +static sigset_t GetSigset(int signo) { + sigset_t signals; + CHECK_EQ(0, sigemptyset(&signals)); + CHECK_EQ(0, sigaddset(&signals, signo)); + return signals; +} + +#if defined(__linux__) + +// Called by the exception handler before minidump is produced. +// Minidump is only written if this returns true. +static bool FilterCallback(void* /*context*/) { + return true; +} + +// Write two null-terminated strings and a newline to both stdout and stderr. +static void WriteLineStdoutStderr(const char* msg1, const char* msg2) { + // We use Breakpad's reimplementation of strlen(), called my_strlen(), from + // linux_libc_support.h to avoid calling into libc. + // A comment from linux_libc_support.h is reproduced here: + // "This header provides replacements for libc functions that we need. If we + // call the libc functions directly we risk crashing in the dynamic linker as + // it tries to resolve uncached PLT entries." + int msg1_len = my_strlen(msg1); + int msg2_len = my_strlen(msg2); + + // We use sys_write() from linux_syscall_support.h here per the + // recommendation of the breakpad docs for the same reasons as above. + for (int fd : {STDOUT_FILENO, STDERR_FILENO}) { + sys_write(fd, msg1, msg1_len); + sys_write(fd, msg2, msg2_len); + sys_write(fd, "\n", 1); + } +} + +// Callback for breakpad. It is called whenever a minidump file has been +// written and should not be called directly. It logs the event before breakpad +// crashes the process. Due to the process being in a failed state we write to +// stdout/stderr and let the surrounding redirection make sure the output gets +// logged. The calls might still fail in unknown scenarios as the process is in +// a broken state. However we don't rely on them as the minidump file has been +// written already. +static bool DumpCallback(const google_breakpad::MinidumpDescriptor& descriptor, + void* context, bool succeeded) { + + // Indicate whether a minidump file was written successfully. Write message + // to stdout/stderr, which will usually be captured in the INFO/ERROR log. + if (succeeded) { + WriteLineStdoutStderr("Wrote minidump to ", descriptor.path()); + } else { + WriteLineStdoutStderr("Failed to write minidump to ", descriptor.path()); + } + + // If invoked by a user signal, return the actual success or failure of + // writing the minidump file so that we can print a user-friendly error + // message if writing the minidump fails. + bool is_user_signal = context != nullptr && *reinterpret_cast<bool*>(context); + if (is_user_signal) { + return succeeded; + } + + // For crash signals. If we didn't want to invoke the previously-installed + // signal handler from glog, we would return the value received in + // 'succeeded' as described in the breakpad documentation. If this callback + // function returned true, breakpad would not invoke the previously-installed + // signal handler; instead, it would invoke the default signal handler, which + // would cause the process to crash immediately after writing the minidump. + // + // We make this callback always return false so that breakpad will invoke any + // previously-installed signal handler afterward. We want that to happen + // because the glog signal handlers print a helpful stacktrace on crash. + // That's convenient to have, because unlike a minidump, it doesn't need to + // be decoded to be useful for debugging. + return false; +} + +// Failure function that simply calls abort(). +static void AbortFailureFunction() { + abort(); +} + +bool MinidumpExceptionHandler::WriteMinidump() { + bool user_signal = true; + return google_breakpad::ExceptionHandler::WriteMinidump(minidump_dir(), + &DumpCallback, + &user_signal); +} + +Status MinidumpExceptionHandler::InitMinidumpExceptionHandler() { + minidump_dir_ = FLAGS_minidump_path; + if (minidump_dir_[0] != '/') { + minidump_dir_ = JoinPathSegments(FLAGS_log_dir, minidump_dir_); + } + + // Create the first-level minidump directory. + Env* env = Env::Default(); + RETURN_NOT_OK_PREPEND(CreateDirIfMissing(env, minidump_dir_), + "Error creating top-level minidump directory"); + + // Add the executable name to the path where minidumps will be written. This makes + // identification easier and prevents name collisions between the files. + // This is also consistent with how Impala organizes its minidump files. + const char* exe_name = gflags::ProgramInvocationShortName(); + minidump_dir_ = JoinPathSegments(minidump_dir_, exe_name); + + // Create the directory if it is not there. The minidump doesn't get written if there is + // no directory. + RETURN_NOT_OK_PREPEND(CreateDirIfMissing(env, minidump_dir_), + "Error creating minidump directory"); + + // Verify that the minidump directory really is a directory. We canonicalize + // in case it's a symlink to a directory. + string canonical_minidump_path; + RETURN_NOT_OK(env->Canonicalize(minidump_dir_, &canonical_minidump_path)); + bool is_dir; + RETURN_NOT_OK(env->IsDirectory(canonical_minidump_path, &is_dir)); + if (!is_dir) { + return Status::IOError("Unable to create minidump directory", canonical_minidump_path); + } + + google_breakpad::MinidumpDescriptor desc(minidump_dir_); + + // Limit filesize if configured. + if (FLAGS_minidump_size_limit_hint_kb > 0) { + size_t size_limit = 1024 * static_cast<int64_t>(FLAGS_minidump_size_limit_hint_kb); + LOG(INFO) << "Setting minidump size limit to " + << HumanReadableNumBytes::ToStringWithoutRounding(size_limit); + desc.set_size_limit(size_limit); + } + + // If we don't uninstall the glog failure function when minidumps are enabled + // then we get two (2) stack traces printed from a LOG(FATAL) or CHECK(): one + // from the glog failure function and one from the glog signal handler. That + // is because we always return false in DumpCallback() in the non-user signal + // case. + google::InstallFailureFunction(&AbortFailureFunction); + + breakpad_handler_.reset( + new google_breakpad::ExceptionHandler(desc, // Path to minidump directory. + FilterCallback, // Indicates whether to write the dump. + DumpCallback, // Output a log message when dumping. + nullptr, // Optional context for callbacks. + true, // Whether to install a crash handler. + -1)); // -1: Use in-process dump generation. + + return Status::OK(); +} + +Status MinidumpExceptionHandler::RegisterMinidumpExceptionHandler() { + if (!FLAGS_enable_minidumps) return Status::OK(); + + // Ensure only one active instance is alive per process at any given time. + CHECK_EQ(0, MinidumpExceptionHandler::current_num_instances_.fetch_add(1)); + RETURN_NOT_OK(InitMinidumpExceptionHandler()); + RETURN_NOT_OK(StartUserSignalHandlerThread()); + return Status::OK(); +} + +void MinidumpExceptionHandler::UnregisterMinidumpExceptionHandler() { + if (!FLAGS_enable_minidumps) return; + + StopUserSignalHandlerThread(); + CHECK_EQ(1, MinidumpExceptionHandler::current_num_instances_.fetch_sub(1)); +} + +Status MinidumpExceptionHandler::StartUserSignalHandlerThread() { + user_signal_handler_thread_running_.store(true, std::memory_order_relaxed); + return Thread::Create("minidump", "sigusr1-handler", + &MinidumpExceptionHandler::RunUserSignalHandlerThread, + this, &user_signal_handler_thread_); +} + +void MinidumpExceptionHandler::StopUserSignalHandlerThread() { + user_signal_handler_thread_running_.store(false, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_release); // Store before signal. + // Send SIGUSR1 signal to thread, which will wake it up. + kill(getpid(), SIGUSR1); + user_signal_handler_thread_->Join(); +} + +void MinidumpExceptionHandler::RunUserSignalHandlerThread() { + sigset_t signals = GetSigset(SIGUSR1); + while (true) { + int signal; + int err = sigwait(&signals, &signal); + CHECK(err == 0) << "sigwait(): " << ErrnoToString(err) << ": " << err; + CHECK_EQ(SIGUSR1, signal); + if (!user_signal_handler_thread_running_.load(std::memory_order_relaxed)) { + // Exit thread if we are shutting down. + return; + } + if (!WriteMinidump()) { + LOG(WARNING) << "Received USR1 signal but failed to write minidump"; + } + } +} + +#else // defined(__linux__) + +// At the time of writing, we don't support breakpad on Mac so we just stub out +// all the methods defined in the header file. + +Status MinidumpExceptionHandler::InitMinidumpExceptionHandler() { + return Status::OK(); +} + +// No-op on non-Linux platforms. +Status MinidumpExceptionHandler::RegisterMinidumpExceptionHandler() { + return Status::OK(); +} + +void MinidumpExceptionHandler::UnregisterMinidumpExceptionHandler() { +} + +bool MinidumpExceptionHandler::WriteMinidump() { + return true; +} + +Status MinidumpExceptionHandler::StartUserSignalHandlerThread() { + return Status::OK(); +} + +void MinidumpExceptionHandler::StopUserSignalHandlerThread() { +} + +void MinidumpExceptionHandler::RunUserSignalHandlerThread() { +} + +#endif // defined(__linux__) + +std::atomic<int> MinidumpExceptionHandler::current_num_instances_; + +MinidumpExceptionHandler::MinidumpExceptionHandler() { + CHECK_OK(RegisterMinidumpExceptionHandler()); +} + +MinidumpExceptionHandler::~MinidumpExceptionHandler() { + UnregisterMinidumpExceptionHandler(); +} + +Status MinidumpExceptionHandler::DeleteExcessMinidumpFiles(Env* env) { + // Do not delete minidump files if minidumps are disabled. + if (!FLAGS_enable_minidumps) return Status::OK(); + + int32_t max_minidumps = FLAGS_max_minidumps; + // Disable rotation if set to 0 or less. + if (max_minidumps <= 0) return Status::OK(); + + // Minidump filenames are created by breakpad in the following format, for example: + // 7b57915b-ee6a-dbc5-21e59491-5c60a2cf.dmp. + string pattern = JoinPathSegments(minidump_dir(), "*.dmp"); + + // Use mtime to determine which minidumps to delete. While this could + // potentially be ambiguous if many minidumps were created in quick + // succession, users can always increase 'FLAGS_max_minidumps' if desired + // in order to work around the problem. + return env_util::DeleteExcessFilesByPattern(env, pattern, max_minidumps); +} + +string MinidumpExceptionHandler::minidump_dir() const { + return minidump_dir_; +} + +Status BlockSigUSR1() { + sigset_t signals = GetSigset(SIGUSR1); + int ret = pthread_sigmask(SIG_BLOCK, &signals, nullptr); + if (ret == 0) return Status::OK(); + return Status::InvalidArgument("pthread_sigmask", ErrnoToString(ret), ret); +} + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/minidump.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/minidump.h b/be/src/kudu/util/minidump.h new file mode 100644 index 0000000..459639b --- /dev/null +++ b/be/src/kudu/util/minidump.h @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <atomic> +#include <memory> +#include <string> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/locks.h" +#include "kudu/util/thread.h" + +namespace google_breakpad { +class ExceptionHandler; +} // namespace google_breakpad + +namespace kudu { + +class Env; +class Status; + +// While an instance of this class is in scope, a Breakpad minidump handler +// will generate a minidump for the current program if it crashes or if it +// received a USR1 signal. This class must be instantiated after initializing +// the gflags library. When used in conjuction with glog, or other signal +// handling facilities, this class must be invoked after installing those +// signal handlers. +// +// The BlockSigUSR1() function should be called before spawning any threads in +// order to block the USR1 signal from crashing the process. This class relies +// on that signal being blocked by all threads in order to safely generate +// minidumps in response to the USR1 signal. +// +// Only one instance of this class may be instantiated at a time. +// +// For more information on Google Breakpad, see its documentation at: +// http://chromium.googlesource.com/breakpad/breakpad/+/master/docs/getting_started_with_breakpad.md +class MinidumpExceptionHandler { + public: + MinidumpExceptionHandler(); + ~MinidumpExceptionHandler(); + + // Write a minidump immediately. Can be used to generate a minidump + // independently of a crash. Should not be called from a signal handler or a + // crash context because it uses the heap. + bool WriteMinidump(); + + // Deletes excess minidump files beyond the configured max of + // 'FLAGS_max_minidumps'. Uses the file's modified time to determine recency. + // Does nothing if 'FLAGS_enabled_minidumps' is false. + Status DeleteExcessMinidumpFiles(Env* env); + + // Get the path to the directory that will be used for writing minidumps. + std::string minidump_dir() const; + + private: + Status InitMinidumpExceptionHandler(); + Status RegisterMinidumpExceptionHandler(); + void UnregisterMinidumpExceptionHandler(); + + Status StartUserSignalHandlerThread(); + void StopUserSignalHandlerThread(); + void RunUserSignalHandlerThread(); + + // The number of instnaces of this class that are currently in existence. + // We keep this counter in order to force a crash if more than one is running + // at a time, as a sanity check. + static std::atomic<int> current_num_instances_; + + #ifndef __APPLE__ + std::atomic<bool> user_signal_handler_thread_running_;// Unused in macOS build. + #endif + + scoped_refptr<Thread> user_signal_handler_thread_; + + // Breakpad ExceptionHandler. It registers its own signal handlers to write + // minidump files during process crashes, but can also be used to write + // minidumps directly. + std::unique_ptr<google_breakpad::ExceptionHandler> breakpad_handler_; + + // Directory in which we store our minidumps. + std::string minidump_dir_; +}; + +// Block SIGUSR1 from threads handling it. +// This should be called by the process before spawning any threads so that a +// USR1 signal will cause a minidump to be generated instead of a crash. +Status BlockSigUSR1(); + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/monotime-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/monotime-test.cc b/be/src/kudu/util/monotime-test.cc new file mode 100644 index 0000000..f193ecd --- /dev/null +++ b/be/src/kudu/util/monotime-test.cc @@ -0,0 +1,419 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/monotime.h" + +#include <sys/time.h> +#include <unistd.h> + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/util/test_util.h" + +namespace kudu { + +TEST(TestMonoTime, TestMonotonicity) { + alarm(360); + MonoTime prev(MonoTime::Now()); + MonoTime next; + + do { + next = MonoTime::Now(); + //LOG(INFO) << " next = " << next.ToString(); + } while (!prev.ComesBefore(next)); + ASSERT_FALSE(next.ComesBefore(prev)); + alarm(0); +} + +TEST(TestMonoTime, TestComparison) { + MonoTime now(MonoTime::Now()); + MonoTime future(now); + future.AddDelta(MonoDelta::FromNanoseconds(1L)); + + ASSERT_GT((future - now).ToNanoseconds(), 0); + ASSERT_LT((now - future).ToNanoseconds(), 0); + ASSERT_EQ((now - now).ToNanoseconds(), 0); + + MonoDelta nano(MonoDelta::FromNanoseconds(1L)); + MonoDelta mil(MonoDelta::FromMilliseconds(1L)); + MonoDelta sec(MonoDelta::FromSeconds(1.0)); + + ASSERT_TRUE(nano.LessThan(mil)); + ASSERT_TRUE(mil.LessThan(sec)); + ASSERT_TRUE(mil.MoreThan(nano)); + ASSERT_TRUE(sec.MoreThan(mil)); +} + +TEST(TestMonoTime, TestTimeVal) { + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + + // Normal conversion case. + MonoDelta one_sec_one_micro(MonoDelta::FromNanoseconds(1000001000L)); + one_sec_one_micro.ToTimeVal(&tv); + ASSERT_EQ(1, tv.tv_sec); + ASSERT_EQ(1, tv.tv_usec); + + // Case where we are still positive but sub-micro. + // Round up to nearest microsecond. This is to avoid infinite timeouts + // in APIs that take a struct timeval. + MonoDelta zero_sec_one_nano(MonoDelta::FromNanoseconds(1L)); + zero_sec_one_nano.ToTimeVal(&tv); + ASSERT_EQ(0, tv.tv_sec); + ASSERT_EQ(1, tv.tv_usec); // Special case: 1ns rounds up to + + // Negative conversion case. Ensure the timeval is normalized. + // That means sec is negative and usec is positive. + MonoDelta neg_micro(MonoDelta::FromMicroseconds(-1L)); + ASSERT_EQ(-1000, neg_micro.ToNanoseconds()); + neg_micro.ToTimeVal(&tv); + ASSERT_EQ(-1, tv.tv_sec); + ASSERT_EQ(999999, tv.tv_usec); + + // Case where we are still negative but sub-micro. + // Round up to nearest microsecond. This is to avoid infinite timeouts + // in APIs that take a struct timeval and for consistency. + MonoDelta zero_sec_neg_one_nano(MonoDelta::FromNanoseconds(-1L)); + zero_sec_neg_one_nano.ToTimeVal(&tv); + ASSERT_EQ(-1, tv.tv_sec); + ASSERT_EQ(999999, tv.tv_usec); +} + +TEST(TestMonoTime, TestTimeSpec) { + MonoTime one_sec_one_nano_expected(1000000001L); + struct timespec ts; + ts.tv_sec = 1; + ts.tv_nsec = 1; + MonoTime one_sec_one_nano_actual(ts); + ASSERT_EQ(0, one_sec_one_nano_expected.GetDeltaSince(one_sec_one_nano_actual).ToNanoseconds()); + + MonoDelta zero_sec_two_nanos(MonoDelta::FromNanoseconds(2L)); + zero_sec_two_nanos.ToTimeSpec(&ts); + ASSERT_EQ(0, ts.tv_sec); + ASSERT_EQ(2, ts.tv_nsec); + + // Negative conversion case. Ensure the timespec is normalized. + // That means sec is negative and nsec is positive. + MonoDelta neg_nano(MonoDelta::FromNanoseconds(-1L)); + ASSERT_EQ(-1, neg_nano.ToNanoseconds()); + neg_nano.ToTimeSpec(&ts); + ASSERT_EQ(-1, ts.tv_sec); + ASSERT_EQ(999999999, ts.tv_nsec); + +} + +TEST(TestMonoTime, TestDeltas) { + alarm(360); + const MonoDelta max_delta(MonoDelta::FromSeconds(0.1)); + MonoTime prev(MonoTime::Now()); + MonoTime next; + MonoDelta cur_delta; + do { + next = MonoTime::Now(); + cur_delta = next.GetDeltaSince(prev); + } while (cur_delta.LessThan(max_delta)); + alarm(0); +} + +TEST(TestMonoTime, TestDeltaConversions) { + // TODO: Reliably test MonoDelta::FromSeconds() considering floating-point rounding errors + + MonoDelta mil(MonoDelta::FromMilliseconds(500)); + ASSERT_EQ(500 * MonoTime::kNanosecondsPerMillisecond, mil.nano_delta_); + + MonoDelta micro(MonoDelta::FromMicroseconds(500)); + ASSERT_EQ(500 * MonoTime::kNanosecondsPerMicrosecond, micro.nano_delta_); + + MonoDelta nano(MonoDelta::FromNanoseconds(500)); + ASSERT_EQ(500, nano.nano_delta_); +} + +static void DoTestMonoTimePerf() { + const MonoDelta max_delta(MonoDelta::FromMilliseconds(500)); + uint64_t num_calls = 0; + MonoTime prev(MonoTime::Now()); + MonoTime next; + MonoDelta cur_delta; + do { + next = MonoTime::Now(); + cur_delta = next.GetDeltaSince(prev); + num_calls++; + } while (cur_delta.LessThan(max_delta)); + LOG(INFO) << "DoTestMonoTimePerf():" + << num_calls << " in " + << max_delta.ToString() << " seconds."; +} + +TEST(TestMonoTime, TestSleepFor) { + MonoTime start = MonoTime::Now(); + MonoDelta sleep = MonoDelta::FromMilliseconds(100); + SleepFor(sleep); + MonoTime end = MonoTime::Now(); + MonoDelta actualSleep = end.GetDeltaSince(start); + ASSERT_GE(actualSleep.ToNanoseconds(), sleep.ToNanoseconds()); +} + +TEST(TestMonoTime, TestSleepForOverflow) { + if (!AllowSlowTests()) { + LOG(INFO) << "Skipping test because it sleeps for ~4s"; + return; + } + + // This quantity (~4s sleep) overflows a 32-bit integer such that + // the value becomes 0. + MonoTime start = MonoTime::Now(); + MonoDelta sleep = MonoDelta::FromNanoseconds(1L << 32); + SleepFor(sleep); + MonoTime end = MonoTime::Now(); + MonoDelta actualSleep = end.GetDeltaSince(start); + ASSERT_GE(actualSleep.ToNanoseconds(), sleep.ToNanoseconds()); +} + +// Test functionality of the handy operators for MonoTime/MonoDelta objects. +// The test assumes that the core functionality provided by the +// MonoTime/MonoDelta objects are in place, and it tests that the operators +// have the expected behavior expressed in terms of already existing, +// semantically equivalent methods. +TEST(TestMonoTime, TestOperators) { + // MonoTime& MonoTime::operator+=(const MonoDelta& delta); + { + MonoTime tmp = MonoTime::Now(); + MonoTime start = tmp; + MonoDelta delta = MonoDelta::FromMilliseconds(100); + MonoTime o_end = start; + o_end += delta; + tmp.AddDelta(delta); + MonoTime m_end = tmp; + EXPECT_TRUE(m_end.Equals(o_end)); + } + + // MonoTime& MonoTime::operator-=(const MonoDelta& delta); + { + MonoTime tmp = MonoTime::Now(); + MonoTime start = tmp; + MonoDelta delta = MonoDelta::FromMilliseconds(100); + MonoTime o_end = start; + o_end -= delta; + tmp.AddDelta(MonoDelta::FromNanoseconds(-delta.ToNanoseconds())); + MonoTime m_end = tmp; + EXPECT_TRUE(m_end.Equals(o_end)); + } + + // bool operator==(const MonoDelta& lhs, const MonoDelta& rhs); + { + MonoDelta dn = MonoDelta::FromNanoseconds(0); + MonoDelta dm = MonoDelta::FromMicroseconds(0); + ASSERT_TRUE(dn.Equals(dm)); + EXPECT_TRUE(dn == dm); + EXPECT_TRUE(dm == dn); + } + + // bool operator!=(const MonoDelta& lhs, const MonoDelta& rhs); + { + MonoDelta dn = MonoDelta::FromNanoseconds(1); + MonoDelta dm = MonoDelta::FromMicroseconds(1); + ASSERT_FALSE(dn.Equals(dm)); + EXPECT_TRUE(dn != dm); + EXPECT_TRUE(dm != dn); + } + + // bool operator<(const MonoDelta& lhs, const MonoDelta& rhs); + { + MonoDelta d0 = MonoDelta::FromNanoseconds(0); + MonoDelta d1 = MonoDelta::FromNanoseconds(1); + ASSERT_TRUE(d0.LessThan(d1)); + EXPECT_TRUE(d0 < d1); + } + + // bool operator<=(const MonoDelta& lhs, const MonoDelta& rhs); + { + MonoDelta d0 = MonoDelta::FromNanoseconds(0); + MonoDelta d1 = MonoDelta::FromNanoseconds(1); + ASSERT_TRUE(d0.LessThan(d1)); + EXPECT_TRUE(d0 <= d1); + + MonoDelta d20 = MonoDelta::FromNanoseconds(2); + MonoDelta d21 = MonoDelta::FromNanoseconds(2); + ASSERT_TRUE(d20.Equals(d21)); + EXPECT_TRUE(d20 <= d21); + } + + // bool operator>(const MonoDelta& lhs, const MonoDelta& rhs); + { + MonoDelta d0 = MonoDelta::FromNanoseconds(0); + MonoDelta d1 = MonoDelta::FromNanoseconds(1); + ASSERT_TRUE(d1.MoreThan(d0)); + EXPECT_TRUE(d1 > d0); + } + + // bool operator>=(const MonoDelta& lhs, const MonoDelta& rhs); + { + MonoDelta d0 = MonoDelta::FromNanoseconds(0); + MonoDelta d1 = MonoDelta::FromNanoseconds(1); + ASSERT_TRUE(d1.MoreThan(d0)); + EXPECT_TRUE(d1 >= d1); + + MonoDelta d20 = MonoDelta::FromNanoseconds(2); + MonoDelta d21 = MonoDelta::FromNanoseconds(2); + ASSERT_TRUE(d20.Equals(d21)); + EXPECT_TRUE(d21 >= d20); + } + + // bool operator==(const MonoTime& lhs, const MonoTime& rhs); + { + MonoTime t0 = MonoTime::Now(); + MonoTime t1(t0); + ASSERT_TRUE(t0.Equals(t1)); + ASSERT_TRUE(t1.Equals(t0)); + EXPECT_TRUE(t0 == t1); + EXPECT_TRUE(t1 == t0); + } + + // bool operator!=(const MonoTime& lhs, const MonoTime& rhs); + { + MonoTime t0 = MonoTime::Now(); + MonoTime t1(t0 + MonoDelta::FromMilliseconds(100)); + ASSERT_TRUE(!t0.Equals(t1)); + ASSERT_TRUE(!t1.Equals(t0)); + EXPECT_TRUE(t0 != t1); + EXPECT_TRUE(t1 != t0); + } + + // bool operator<(const MonoTime& lhs, const MonoTime& rhs); + { + MonoTime t0 = MonoTime::Now(); + MonoTime t1(t0 + MonoDelta::FromMilliseconds(100)); + ASSERT_TRUE(t0.ComesBefore(t1)); + ASSERT_FALSE(t1.ComesBefore(t0)); + EXPECT_TRUE(t0 < t1); + EXPECT_FALSE(t1 < t0); + } + + // bool operator<=(const MonoTime& lhs, const MonoTime& rhs); + { + MonoTime t00 = MonoTime::Now(); + MonoTime t01(t00); + ASSERT_TRUE(t00.Equals(t00)); + ASSERT_TRUE(t00.Equals(t01)); + ASSERT_TRUE(t01.Equals(t00)); + ASSERT_TRUE(t01.Equals(t01)); + EXPECT_TRUE(t00 <= t00); + EXPECT_TRUE(t00 <= t01); + EXPECT_TRUE(t01 <= t00); + EXPECT_TRUE(t01 <= t01); + + MonoTime t1(t00 + MonoDelta::FromMilliseconds(100)); + ASSERT_TRUE(t00.ComesBefore(t1)); + ASSERT_TRUE(t01.ComesBefore(t1)); + ASSERT_FALSE(t1.ComesBefore(t00)); + ASSERT_FALSE(t1.ComesBefore(t01)); + EXPECT_TRUE(t00 <= t1); + EXPECT_TRUE(t01 <= t1); + EXPECT_FALSE(t1 <= t00); + EXPECT_FALSE(t1 <= t01); + } + + // bool operator>(const MonoTime& lhs, const MonoTime& rhs); + { + MonoTime t0 = MonoTime::Now(); + MonoTime t1(t0 + MonoDelta::FromMilliseconds(100)); + ASSERT_TRUE(t0.ComesBefore(t1)); + ASSERT_FALSE(t1.ComesBefore(t0)); + EXPECT_TRUE(t0 < t1); + EXPECT_FALSE(t1 < t0); + } + + // bool operator>=(const MonoTime& lhs, const MonoTime& rhs); + { + MonoTime t00 = MonoTime::Now(); + MonoTime t01(t00); + ASSERT_TRUE(t00.Equals(t00)); + ASSERT_TRUE(t00.Equals(t01)); + ASSERT_TRUE(t01.Equals(t00)); + ASSERT_TRUE(t01.Equals(t01)); + EXPECT_TRUE(t00 >= t00); + EXPECT_TRUE(t00 >= t01); + EXPECT_TRUE(t01 >= t00); + EXPECT_TRUE(t01 >= t01); + + MonoTime t1(t00 + MonoDelta::FromMilliseconds(100)); + ASSERT_TRUE(t00.ComesBefore(t1)); + ASSERT_TRUE(t01.ComesBefore(t1)); + ASSERT_FALSE(t1.ComesBefore(t00)); + ASSERT_FALSE(t1.ComesBefore(t01)); + EXPECT_FALSE(t00 >= t1); + EXPECT_FALSE(t01 >= t1); + EXPECT_TRUE(t1 >= t00); + EXPECT_TRUE(t1 >= t01); + } + + // MonoDelta operator-(const MonoTime& t0, const MonoTime& t1); + { + const int64_t deltas[] = { 100, -100 }; + + MonoTime tmp = MonoTime::Now(); + for (auto d : deltas) { + MonoDelta delta = MonoDelta::FromMilliseconds(d); + + MonoTime start = tmp; + tmp.AddDelta(delta); + MonoTime end = tmp; + MonoDelta delta_o = end - start; + EXPECT_TRUE(delta.Equals(delta_o)); + } + } + + // MonoTime operator+(const MonoTime& t, const MonoDelta& delta); + { + MonoTime start = MonoTime::Now(); + + MonoDelta delta_0 = MonoDelta::FromMilliseconds(0); + MonoTime end_0 = start + delta_0; + EXPECT_TRUE(end_0.Equals(start)); + + MonoDelta delta_1 = MonoDelta::FromMilliseconds(1); + MonoTime end_1 = start + delta_1; + EXPECT_TRUE(end_1 > end_0); + end_0.AddDelta(delta_1); + EXPECT_TRUE(end_0.Equals(end_1)); + } + + // MonoTime operator-(const MonoTime& t, const MonoDelta& delta); + { + MonoTime start = MonoTime::Now(); + + MonoDelta delta_0 = MonoDelta::FromMilliseconds(0); + MonoTime end_0 = start - delta_0; + EXPECT_TRUE(end_0.Equals(start)); + + MonoDelta delta_1 = MonoDelta::FromMilliseconds(1); + MonoTime end_1 = start - delta_1; + EXPECT_TRUE(end_1 < end_0); + end_1.AddDelta(delta_1); + EXPECT_TRUE(end_1.Equals(end_0)); + } +} + +TEST(TestMonoTimePerf, TestMonoTimePerf) { + alarm(360); + DoTestMonoTimePerf(); + alarm(0); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/monotime.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/monotime.cc b/be/src/kudu/util/monotime.cc new file mode 100644 index 0000000..b67be42 --- /dev/null +++ b/be/src/kudu/util/monotime.cc @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/monotime.h" + +#include <glog/logging.h> +#include <limits> +#include <stdlib.h> +#include <string.h> +#include <sys/time.h> +#include <time.h> + +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/sysinfo.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/thread_restrictions.h" + +namespace kudu { + +#define MAX_MONOTONIC_SECONDS \ + (((1ULL<<63) - 1ULL) /(int64_t)MonoTime::kNanosecondsPerSecond) + + +/// +/// MonoDelta +/// + +const int64_t MonoDelta::kUninitialized = kint64min; + +MonoDelta MonoDelta::FromSeconds(double seconds) { + int64_t delta = seconds * MonoTime::kNanosecondsPerSecond; + return MonoDelta(delta); +} + +MonoDelta MonoDelta::FromMilliseconds(int64_t ms) { + return MonoDelta(ms * MonoTime::kNanosecondsPerMillisecond); +} + +MonoDelta MonoDelta::FromMicroseconds(int64_t us) { + return MonoDelta(us * MonoTime::kNanosecondsPerMicrosecond); +} + +MonoDelta MonoDelta::FromNanoseconds(int64_t ns) { + return MonoDelta(ns); +} + +MonoDelta::MonoDelta() + : nano_delta_(kUninitialized) { +} + +bool MonoDelta::Initialized() const { + return nano_delta_ != kUninitialized; +} + +bool MonoDelta::LessThan(const MonoDelta &rhs) const { + DCHECK(Initialized()); + DCHECK(rhs.Initialized()); + return nano_delta_ < rhs.nano_delta_; +} + +bool MonoDelta::MoreThan(const MonoDelta &rhs) const { + DCHECK(Initialized()); + DCHECK(rhs.Initialized()); + return nano_delta_ > rhs.nano_delta_; +} + +bool MonoDelta::Equals(const MonoDelta &rhs) const { + DCHECK(Initialized()); + DCHECK(rhs.Initialized()); + return nano_delta_ == rhs.nano_delta_; +} + +std::string MonoDelta::ToString() const { + return StringPrintf("%.3fs", ToSeconds()); +} + +MonoDelta::MonoDelta(int64_t delta) + : nano_delta_(delta) { +} + +double MonoDelta::ToSeconds() const { + DCHECK(Initialized()); + double d(nano_delta_); + d /= MonoTime::kNanosecondsPerSecond; + return d; +} + +int64_t MonoDelta::ToNanoseconds() const { + DCHECK(Initialized()); + return nano_delta_; +} + +int64_t MonoDelta::ToMicroseconds() const { + DCHECK(Initialized()); + return nano_delta_ / MonoTime::kNanosecondsPerMicrosecond; +} + +int64_t MonoDelta::ToMilliseconds() const { + DCHECK(Initialized()); + return nano_delta_ / MonoTime::kNanosecondsPerMillisecond; +} + +void MonoDelta::ToTimeVal(struct timeval *tv) const { + DCHECK(Initialized()); + tv->tv_sec = nano_delta_ / MonoTime::kNanosecondsPerSecond; + tv->tv_usec = (nano_delta_ - (tv->tv_sec * MonoTime::kNanosecondsPerSecond)) + / MonoTime::kNanosecondsPerMicrosecond; + + // tv_usec must be between 0 and 999999. + // There is little use for negative timevals so wrap it in PREDICT_FALSE. + if (PREDICT_FALSE(tv->tv_usec < 0)) { + --(tv->tv_sec); + tv->tv_usec += 1000000; + } + + // Catch positive corner case where we "round down" and could potentially set a timeout of 0. + // Make it 1 usec. + if (PREDICT_FALSE(tv->tv_usec == 0 && tv->tv_sec == 0 && nano_delta_ > 0)) { + tv->tv_usec = 1; + } + + // Catch negative corner case where we "round down" and could potentially set a timeout of 0. + // Make it -1 usec (but normalized, so tv_usec is not negative). + if (PREDICT_FALSE(tv->tv_usec == 0 && tv->tv_sec == 0 && nano_delta_ < 0)) { + tv->tv_sec = -1; + tv->tv_usec = 999999; + } +} + + +void MonoDelta::NanosToTimeSpec(int64_t nanos, struct timespec* ts) { + ts->tv_sec = nanos / MonoTime::kNanosecondsPerSecond; + ts->tv_nsec = nanos - (ts->tv_sec * MonoTime::kNanosecondsPerSecond); + + // tv_nsec must be between 0 and 999999999. + // There is little use for negative timespecs so wrap it in PREDICT_FALSE. + if (PREDICT_FALSE(ts->tv_nsec < 0)) { + --(ts->tv_sec); + ts->tv_nsec += MonoTime::kNanosecondsPerSecond; + } +} + +void MonoDelta::ToTimeSpec(struct timespec *ts) const { + DCHECK(Initialized()); + NanosToTimeSpec(nano_delta_, ts); +} + +/// +/// MonoTime +/// + +MonoTime MonoTime::Now() { +#if defined(__APPLE__) + return MonoTime(walltime_internal::GetMonoTimeNanos()); +# else + struct timespec ts; + PCHECK(clock_gettime(CLOCK_MONOTONIC, &ts) == 0); + return MonoTime(ts); +#endif // defined(__APPLE__) +} + +MonoTime MonoTime::Max() { + return MonoTime(std::numeric_limits<int64_t>::max()); +} + +MonoTime MonoTime::Min() { + return MonoTime(1); +} + +const MonoTime& MonoTime::Earliest(const MonoTime& a, const MonoTime& b) { + if (b.nanos_ < a.nanos_) { + return b; + } + return a; +} + +MonoTime::MonoTime() + : nanos_(0) { +} + +bool MonoTime::Initialized() const { + return nanos_ != 0; +} + +MonoDelta MonoTime::GetDeltaSince(const MonoTime &rhs) const { + DCHECK(Initialized()); + DCHECK(rhs.Initialized()); + int64_t delta(nanos_); + delta -= rhs.nanos_; + return MonoDelta(delta); +} + +void MonoTime::AddDelta(const MonoDelta &delta) { + DCHECK(Initialized()); + nanos_ += delta.nano_delta_; +} + +bool MonoTime::ComesBefore(const MonoTime &rhs) const { + DCHECK(Initialized()); + DCHECK(rhs.Initialized()); + return nanos_ < rhs.nanos_; +} + +std::string MonoTime::ToString() const { + return StringPrintf("%.3fs", ToSeconds()); +} + +bool MonoTime::Equals(const MonoTime& other) const { + return nanos_ == other.nanos_; +} + +MonoTime& MonoTime::operator+=(const MonoDelta& delta) { + this->AddDelta(delta); + return *this; +} + +MonoTime& MonoTime::operator-=(const MonoDelta& delta) { + this->AddDelta(MonoDelta(-1 * delta.nano_delta_)); + return *this; +} + +MonoTime::MonoTime(const struct timespec &ts) { + // Monotonic time resets when the machine reboots. The 64-bit limitation + // means that we can't represent times larger than 292 years, which should be + // adequate. + CHECK_LT(ts.tv_sec, MAX_MONOTONIC_SECONDS); + nanos_ = ts.tv_sec; + nanos_ *= MonoTime::kNanosecondsPerSecond; + nanos_ += ts.tv_nsec; +} + +MonoTime::MonoTime(int64_t nanos) + : nanos_(nanos) { +} + +double MonoTime::ToSeconds() const { + double d(nanos_); + d /= MonoTime::kNanosecondsPerSecond; + return d; +} + +void SleepFor(const MonoDelta& delta) { + ThreadRestrictions::AssertWaitAllowed(); + base::SleepForNanoseconds(delta.ToNanoseconds()); +} + +bool operator==(const MonoDelta &lhs, const MonoDelta &rhs) { + return lhs.Equals(rhs); +} + +bool operator!=(const MonoDelta &lhs, const MonoDelta &rhs) { + return !lhs.Equals(rhs); +} + +bool operator<(const MonoDelta &lhs, const MonoDelta &rhs) { + return lhs.LessThan(rhs); +} + +bool operator<=(const MonoDelta &lhs, const MonoDelta &rhs) { + return lhs.LessThan(rhs) || lhs.Equals(rhs); +} + +bool operator>(const MonoDelta &lhs, const MonoDelta &rhs) { + return lhs.MoreThan(rhs); +} + +bool operator>=(const MonoDelta &lhs, const MonoDelta &rhs) { + return lhs.MoreThan(rhs) || lhs.Equals(rhs); +} + +bool operator==(const MonoTime& lhs, const MonoTime& rhs) { + return lhs.Equals(rhs); +} + +bool operator!=(const MonoTime& lhs, const MonoTime& rhs) { + return !lhs.Equals(rhs); +} + +bool operator<(const MonoTime& lhs, const MonoTime& rhs) { + return lhs.ComesBefore(rhs); +} + +bool operator<=(const MonoTime& lhs, const MonoTime& rhs) { + return lhs.ComesBefore(rhs) || lhs.Equals(rhs); +} + +bool operator>(const MonoTime& lhs, const MonoTime& rhs) { + return rhs.ComesBefore(lhs); +} + +bool operator>=(const MonoTime& lhs, const MonoTime& rhs) { + return rhs.ComesBefore(lhs) || rhs.Equals(lhs); +} + +MonoTime operator+(const MonoTime& t, const MonoDelta& delta) { + MonoTime tmp(t); + tmp.AddDelta(delta); + return tmp; +} + +MonoTime operator-(const MonoTime& t, const MonoDelta& delta) { + MonoTime tmp(t); + tmp.AddDelta(MonoDelta::FromNanoseconds(-delta.ToNanoseconds())); + return tmp; +} + +MonoDelta operator-(const MonoTime& t_end, const MonoTime& t_beg) { + return t_end.GetDeltaSince(t_beg); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/monotime.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/monotime.h b/be/src/kudu/util/monotime.h new file mode 100644 index 0000000..007c54d --- /dev/null +++ b/be/src/kudu/util/monotime.h @@ -0,0 +1,412 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_MONOTIME_H +#define KUDU_UTIL_MONOTIME_H + +#include <stdint.h> +#include <string> + +#ifdef KUDU_HEADERS_NO_STUBS +#include <gtest/gtest_prod.h> +#else +// This is a poor module interdependency, but the stubs are header-only and +// it's only for exported header builds, so we'll make an exception. +#include "kudu/client/stubs.h" +#endif + +#include "kudu/util/kudu_export.h" + +struct timeval; +struct timespec; + +namespace kudu { +class MonoTime; + +/// @brief A representation of a time interval. +/// +/// The MonoDelta class represents an elapsed duration of time -- i.e. +/// the delta between two MonoTime instances. +class KUDU_EXPORT MonoDelta { + public: + /// @name Converters from seconds representation (and ubiquitous SI prefixes). + /// + /// @param [in] seconds/ms/us/ns + /// Time interval representation in seconds (with ubiquitous SI prefixes). + /// @return The resulting MonoDelta object initialized in accordance with + /// the specified parameter. + /// + ///@{ + static MonoDelta FromSeconds(double seconds); + static MonoDelta FromMilliseconds(int64_t ms); + static MonoDelta FromMicroseconds(int64_t us); + static MonoDelta FromNanoseconds(int64_t ns); + ///@} + + /// Build a MonoDelta object. + /// + /// @note A MonoDelta instance built with the this default constructor is + /// "uninitialized" and may not be used for any operation. + MonoDelta(); + + /// @return @c true iff this object is initialized. + bool Initialized() const; + + /// Check whether this time interval is shorter than the specified one. + /// + /// @param [in] rhs + /// A time interval for comparison. + /// @return @c true iff this time interval is strictly shorter + /// than the specified one. + bool LessThan(const MonoDelta &rhs) const; + + /// Check whether this time interval is longer than the specified one. + /// + /// @param [in] rhs + /// A time interval for comparison. + /// @return @c true iff this time interval is strictly longer + /// than the specified one. + bool MoreThan(const MonoDelta &rhs) const; + + /// Check whether this time interval has the same duration + /// as the specified one. + /// + /// @param [in] rhs + /// A time interval for comparison. + /// @return @c true iff this time interval has the same duration as the + /// the specified one. + bool Equals(const MonoDelta &rhs) const; + + /// @return String representation of this interval's duration (in seconds). + std::string ToString() const; + + /// @name Converters into seconds representation (and ubiquitous SI prefixes). + /// + /// @return Representation of the time interval in appropriate SI units. + /// + ///@{ + double ToSeconds() const; + int64_t ToMilliseconds() const; + int64_t ToMicroseconds() const; + int64_t ToNanoseconds() const; + ///@} + + /// Represent this time interval as a timeval structure, with microsecond + /// accuracy. + /// + /// @param [out] tv + /// Placeholder for the result value. + void ToTimeVal(struct timeval *tv) const; + + /// Represent this time interval as a timespec structure, with nanosecond + /// accuracy. + /// + /// @param [out] ts + /// Placeholder for the result value. + void ToTimeSpec(struct timespec *ts) const; + + /// Convert a nanosecond value to a timespec. + /// + /// @param [in] nanos + /// Representation of a relative point in time in nanoseconds. + /// @param [out] ts + /// Placeholder for the resulting timespec representation. + static void NanosToTimeSpec(int64_t nanos, struct timespec* ts); + + private: + static const int64_t kUninitialized; + + friend class MonoTime; + FRIEND_TEST(TestMonoTime, TestDeltaConversions); + + explicit MonoDelta(int64_t delta); + int64_t nano_delta_; +}; + +/// @brief Representation of a particular point in time. +/// +/// The MonoTime class represents a particular point in time, +/// relative to some fixed but unspecified reference point. +/// +/// This time is monotonic, meaning that if the user changes his or her system +/// clock, the monotime does not change. +class KUDU_EXPORT MonoTime { + public: + /// @name Conversion constants for ubiquitous time units. + /// + ///@{ + static const int64_t kNanosecondsPerSecond = 1000000000L; + static const int64_t kNanosecondsPerMillisecond = 1000000L; + static const int64_t kNanosecondsPerMicrosecond = 1000L; + + static const int64_t kMicrosecondsPerSecond = 1000000L; + ///@} + + /// Get current time in MonoTime representation. + /// + /// @return Time specification for the moment of the method's invocation. + static MonoTime Now(); + + /// @return MonoTime equal to farthest possible time into the future. + static MonoTime Max(); + + /// @return MonoTime equal to farthest possible time into the past. + static MonoTime Min(); + + /// Select the earliest between the specified time points. + /// + /// @param [in] a + /// The first MonoTime object to select from. + /// @param [in] b + /// The second MonoTime object to select from. + /// @return The earliest (minimum) of the two monotimes. + static const MonoTime& Earliest(const MonoTime& a, const MonoTime& b); + + /// Build a MonoTime object. The resulting object is not initialized + /// and not ready to use. + MonoTime(); + + /// @return @c true iff the object is initialized. + bool Initialized() const; + + /// Compute time interval between the point in time specified by this + /// and the specified object. + /// + /// @param [in] rhs + /// The object that corresponds to the left boundary of the time interval, + /// where this object corresponds to the right boundary of the interval. + /// @return The resulting time interval represented as a MonoDelta object. + MonoDelta GetDeltaSince(const MonoTime &rhs) const; + + /// Advance this object's time specification by the specified interval. + /// + /// @param [in] delta + /// The time interval to add. + void AddDelta(const MonoDelta &delta); + + /// Check whether the point in time specified by this object is earlier + /// than the specified one. + /// + /// @param [in] rhs + /// The other MonoTime object to compare with. + /// @return @c true iff the point in time represented by this MonoTime object + /// is earlier then the point in time represented by the parameter. + bool ComesBefore(const MonoTime &rhs) const; + + /// @return String representation of the object (in seconds). + std::string ToString() const; + + /// Check whether this object represents the same point in time as the other. + /// + /// @param [in] other + /// The other MonoTime object to compare. + /// @return @c true iff the point in time represented by this MonoTime object + /// is the same as the one represented by the other. + bool Equals(const MonoTime& other) const; + + /// @name Syntactic sugar: increment/decrement operators for MonoTime. + ///@{ + /// + /// Add a delta to the point in time represented by the object. + /// + /// @param [in] delta + /// The delta to add. + /// @return Reference to the modified object. + MonoTime& operator+=(const MonoDelta& delta); + + /// Substract a delta from the point in time represented by the object. + /// + /// @param [in] delta + /// The delta to substract. + /// @return Reference to the modified object. + MonoTime& operator-=(const MonoDelta& delta); + ///@} + + private: + friend class MonoDelta; + FRIEND_TEST(TestMonoTime, TestTimeSpec); + FRIEND_TEST(TestMonoTime, TestDeltaConversions); + + explicit MonoTime(const struct timespec &ts); + explicit MonoTime(int64_t nanos); + double ToSeconds() const; + int64_t nanos_; +}; + +/// Sleep for an interval specified by a MonoDelta instance. +/// +/// This is preferred over sleep(3), usleep(3), and nanosleep(3). +/// It's less prone to mixups with units since it uses the MonoDelta for +/// interval specification. +/// Besides, it ignores signals/EINTR, so will reliably sleep at least for the +/// MonoDelta duration. +/// +/// @param [in] delta +/// The time interval to sleep for. +void KUDU_EXPORT SleepFor(const MonoDelta& delta); + +/// @name Syntactic sugar: binary operators for MonoDelta. +///@{ +/// +/// @param [in] lhs +/// A time interval for comparison: the left-hand operand. +/// @param [in] rhs +/// A time interval for comparison: the right-hand operand. +/// @return @c true iff the time interval represented by @c lhs is equal +/// to the time interval represented by @c rhs. +bool KUDU_EXPORT operator==(const MonoDelta &lhs, const MonoDelta &rhs); + +/// @param [in] lhs +/// A time interval for comparison: the left-hand operand. +/// @param [in] rhs +/// A time interval for comparison: the right-hand operand. +/// @return @c true iff the time interval represented by @c lhs is not equal +/// to the time interval represented by @c rhs. +bool KUDU_EXPORT operator!=(const MonoDelta &lhs, const MonoDelta &rhs); + +/// @param [in] lhs +/// A time interval for comparison: the left-hand operand. +/// @param [in] rhs +/// A time interval for comparison: the right-hand operand. +/// @return @c true iff the time interval represented by @c lhs is shorter +/// than the time interval represented by @c rhs. +bool KUDU_EXPORT operator<(const MonoDelta &lhs, const MonoDelta &rhs); + +/// @param [in] lhs +/// A time interval for comparison: the left-hand operand. +/// @param [in] rhs +/// A time interval for comparison: the right-hand operand. +/// @return @c true iff the time interval represented by @c lhs is shorter +/// than or equal to the time interval represented by @c rhs. +bool KUDU_EXPORT operator<=(const MonoDelta &lhs, const MonoDelta &rhs); + +/// @param [in] lhs +/// A time interval for comparison: the left-hand operand. +/// @param [in] rhs +/// A time interval for comparison: the right-hand operand. +/// @return @c true iff the time interval represented by @c lhs is longer +/// than the time interval represented by @c rhs. +bool KUDU_EXPORT operator>(const MonoDelta &lhs, const MonoDelta &rhs); + +/// @param [in] lhs +/// A time interval for comparison: the left-hand operand. +/// @param [in] rhs +/// A time interval for comparison: the right-hand operand. +/// @return @c true iff the time interval represented by @c lhs is longer +/// than or equal to the time interval represented by @c rhs. +bool KUDU_EXPORT operator>=(const MonoDelta &lhs, const MonoDelta &rhs); +///@} + +/// @name Syntactic sugar: binary operators for MonoTime. +///@{ +/// +/// Check if the specified objects represent the same point in time. +/// +/// This is a handy operator which is semantically equivalent to +/// MonoTime::Equals(). +/// +/// @param [in] lhs +/// The left-hand operand. +/// @param [in] rhs +/// The right-hand operand. +/// @return @c true iff the given objects represent the same point in time. +bool KUDU_EXPORT operator==(const MonoTime& lhs, const MonoTime& rhs); + +/// Check if the specified objects represent different points in time. +/// +/// This is a handy operator which is semantically equivalent to the negation of +/// MonoTime::Equals(). +/// +/// @param [in] lhs +/// The left-hand operand. +/// @param [in] rhs +/// The right-hand operand. +/// @return @c true iff the given object represents a different point in time +/// than the specified one. +bool KUDU_EXPORT operator!=(const MonoTime& lhs, const MonoTime& rhs); + +/// @param [in] lhs +/// The left-hand operand. +/// @param [in] rhs +/// The right-hand operand. +/// @return @c true iff the @c lhs object represents an earlier point in time +/// than the @c rhs object. +bool KUDU_EXPORT operator<(const MonoTime& lhs, const MonoTime& rhs); + +/// @param [in] lhs +/// The left-hand operand. +/// @param [in] rhs +/// The right-hand operand. +/// @return @c true iff the @c lhs object represents an earlier than or +/// the same point in time as the @c rhs object. +bool KUDU_EXPORT operator<=(const MonoTime& lhs, const MonoTime& rhs); + +/// @param [in] lhs +/// The left-hand operand. +/// @param [in] rhs +/// The right-hand operand. +/// @return @c true iff the @c lhs object represents a later point in time +/// than the @c rhs object. +bool KUDU_EXPORT operator>(const MonoTime& lhs, const MonoTime& rhs); + +/// @param [in] lhs +/// The left-hand operand. +/// @param [in] rhs +/// The right-hand operand. +/// @return @c true iff the @c lhs object represents a later than or +/// the same point in time as the @c rhs object. +bool KUDU_EXPORT operator>=(const MonoTime& lhs, const MonoTime& rhs); +///@} + +/// @name Syntactic sugar: mixed binary operators for MonoTime/MonoDelta. +///@{ +/// +/// Add the specified time interval to the given point in time. +/// +/// @param [in] t +/// A MonoTime object representing the given point in time. +/// @param [in] delta +/// A MonoDelta object representing the specified time interval. +/// @return A MonoTime object representing the resulting point in time. +MonoTime KUDU_EXPORT operator+(const MonoTime& t, const MonoDelta& delta); + +/// Subtract the specified time interval from the given point in time. +/// +/// @param [in] t +/// A MonoTime object representing the given point in time. +/// @param [in] delta +/// A MonoDelta object representing the specified time interval. +/// @return A MonoTime object representing the resulting point in time. +MonoTime KUDU_EXPORT operator-(const MonoTime& t, const MonoDelta& delta); + +/// Compute the time interval between the specified points in time. +/// +/// Semantically, this is equivalent to t0.GetDeltaSince(t1). +/// +/// @param [in] t_end +/// The second point in time. Semantically corresponds to the end +/// of the resulting time interval. +/// @param [in] t_beg +/// The first point in time. Semantically corresponds to the beginning +/// of the resulting time interval. +/// @return A MonoDelta object representing the time interval between the +/// specified points in time. +MonoDelta KUDU_EXPORT operator-(const MonoTime& t_end, const MonoTime& t_begin); +///@} + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mt-hdr_histogram-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/mt-hdr_histogram-test.cc b/be/src/kudu/util/mt-hdr_histogram-test.cc new file mode 100644 index 0000000..879c5e3 --- /dev/null +++ b/be/src/kudu/util/mt-hdr_histogram-test.cc @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> +#include <gflags/gflags.h> +#include <vector> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/hdr_histogram.h" +#include "kudu/util/status.h" +#include "kudu/util/test_util.h" +#include "kudu/util/thread.h" + +DEFINE_int32(histogram_test_num_threads, 16, + "Number of threads to spawn for mt-hdr_histogram test"); +DEFINE_uint64(histogram_test_num_increments_per_thread, 100000LU, + "Number of times to call Increment() per thread in mt-hdr_histogram test"); + +using std::vector; + +namespace kudu { + +class MtHdrHistogramTest : public KuduTest { + public: + MtHdrHistogramTest() { + num_threads_ = FLAGS_histogram_test_num_threads; + num_times_ = FLAGS_histogram_test_num_increments_per_thread; + } + + protected: + int num_threads_; + uint64_t num_times_; +}; + +// Increment a counter a bunch of times in the same bucket +static void IncrementSameHistValue(HdrHistogram* hist, uint64_t value, uint64_t times) { + for (uint64_t i = 0; i < times; i++) { + hist->Increment(value); + } +} + +TEST_F(MtHdrHistogramTest, ConcurrentWriteTest) { + const uint64_t kValue = 1LU; + + HdrHistogram hist(100000LU, 3); + + auto threads = new scoped_refptr<kudu::Thread>[num_threads_]; + for (int i = 0; i < num_threads_; i++) { + CHECK_OK(kudu::Thread::Create("test", strings::Substitute("thread-$0", i), + IncrementSameHistValue, &hist, kValue, num_times_, &threads[i])); + } + for (int i = 0; i < num_threads_; i++) { + CHECK_OK(ThreadJoiner(threads[i].get()).Join()); + } + + HdrHistogram snapshot(hist); + ASSERT_EQ(num_threads_ * num_times_, snapshot.CountInBucketForValue(kValue)); + + delete[] threads; +} + +// Copy while writing, then iterate to ensure copies are consistent. +TEST_F(MtHdrHistogramTest, ConcurrentCopyWhileWritingTest) { + const int kNumCopies = 10; + const uint64_t kValue = 1; + + HdrHistogram hist(100000LU, 3); + + auto threads = new scoped_refptr<kudu::Thread>[num_threads_]; + for (int i = 0; i < num_threads_; i++) { + CHECK_OK(kudu::Thread::Create("test", strings::Substitute("thread-$0", i), + IncrementSameHistValue, &hist, kValue, num_times_, &threads[i])); + } + + // This is somewhat racy but the goal is to catch this issue at least + // most of the time. At the time of this writing, before fixing a bug where + // the total count stored in a copied histogram may not match its internal + // counts (under concurrent writes), this test fails for me on 100/100 runs. + vector<HdrHistogram *> snapshots; + ElementDeleter deleter(&snapshots); + for (int i = 0; i < kNumCopies; i++) { + snapshots.push_back(new HdrHistogram(hist)); + SleepFor(MonoDelta::FromMicroseconds(100)); + } + for (int i = 0; i < kNumCopies; i++) { + snapshots[i]->MeanValue(); // Will crash if underlying iterator is inconsistent. + } + + for (int i = 0; i < num_threads_; i++) { + CHECK_OK(ThreadJoiner(threads[i].get()).Join()); + } + + delete[] threads; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mt-metrics-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/mt-metrics-test.cc b/be/src/kudu/util/mt-metrics-test.cc new file mode 100644 index 0000000..b4512fb --- /dev/null +++ b/be/src/kudu/util/mt-metrics-test.cc @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <vector> + +#include <boost/bind.hpp> +#include <boost/function.hpp> +#include <gflags/gflags.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/debug/leakcheck_disabler.h" +#include "kudu/util/jsonwriter.h" +#include "kudu/util/metrics.h" +#include "kudu/util/monotime.h" +#include "kudu/util/test_util.h" +#include "kudu/util/thread.h" + +DEFINE_int32(mt_metrics_test_num_threads, 4, + "Number of threads to spawn in mt metrics tests"); + +METRIC_DEFINE_entity(test_entity); + +namespace kudu { + +using debug::ScopedLeakCheckDisabler; +using std::vector; + +class MultiThreadedMetricsTest : public KuduTest { + public: + static void RegisterCounters(const scoped_refptr<MetricEntity>& metric_entity, + const string& name_prefix, int num_counters); + + MetricRegistry registry_; +}; + +// Call increment on a Counter a bunch of times. +static void CountWithCounter(scoped_refptr<Counter> counter, int num_increments) { + for (int i = 0; i < num_increments; i++) { + counter->Increment(); + } +} + +// Helper function that spawns and then joins a bunch of threads. +static void RunWithManyThreads(boost::function<void()>* f, int num_threads) { + vector<scoped_refptr<kudu::Thread> > threads; + for (int i = 0; i < num_threads; i++) { + scoped_refptr<kudu::Thread> new_thread; + CHECK_OK(kudu::Thread::Create("test", StringPrintf("thread%d", i), + *f, &new_thread)); + threads.push_back(new_thread); + } + for (int i = 0; i < num_threads; i++) { + ASSERT_OK(ThreadJoiner(threads[i].get()).Join()); + } +} + +METRIC_DEFINE_counter(test_entity, test_counter, "Test Counter", + MetricUnit::kRequests, "Test counter"); + +// Ensure that incrementing a counter is thread-safe. +TEST_F(MultiThreadedMetricsTest, CounterIncrementTest) { + scoped_refptr<Counter> counter = new Counter(&METRIC_test_counter); + int num_threads = FLAGS_mt_metrics_test_num_threads; + int num_increments = 1000; + boost::function<void()> f = + boost::bind(CountWithCounter, counter, num_increments); + RunWithManyThreads(&f, num_threads); + ASSERT_EQ(num_threads * num_increments, counter->value()); +} + +// Helper function to register a bunch of counters in a loop. +void MultiThreadedMetricsTest::RegisterCounters( + const scoped_refptr<MetricEntity>& metric_entity, + const string& name_prefix, + int num_counters) { + uint64_t tid = Env::Default()->gettid(); + for (int i = 0; i < num_counters; i++) { + // This loop purposefully leaks metrics prototypes, because the metrics system + // expects the prototypes and their names to live forever. This is the only + // place we dynamically generate them for the purposes of a test, so it's easier + // to just leak them than to figure out a way to manage lifecycle of objects that + // are typically static. + ScopedLeakCheckDisabler disabler; + + string name = strings::Substitute("$0-$1-$2", name_prefix, tid, i); + auto proto = new CounterPrototype(MetricPrototype::CtorArgs( + "test_entity", strdup(name.c_str()), "Test Counter", + MetricUnit::kOperations, "test counter")); + proto->Instantiate(metric_entity)->Increment(); + } +} + +// Ensure that adding a counter to a registry is thread-safe. +TEST_F(MultiThreadedMetricsTest, AddCounterToRegistryTest) { + scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate(®istry_, "my-test"); + int num_threads = FLAGS_mt_metrics_test_num_threads; + int num_counters = 1000; + boost::function<void()> f = + boost::bind(RegisterCounters, entity, "prefix", num_counters); + RunWithManyThreads(&f, num_threads); + ASSERT_EQ(num_threads * num_counters, entity->UnsafeMetricsMapForTests().size()); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mt-threadlocal-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/mt-threadlocal-test.cc b/be/src/kudu/util/mt-threadlocal-test.cc new file mode 100644 index 0000000..2390b82 --- /dev/null +++ b/be/src/kudu/util/mt-threadlocal-test.cc @@ -0,0 +1,349 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <mutex> +#include <string> +#include <unordered_set> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/env.h" +#include "kudu/util/locks.h" +#include "kudu/util/test_util.h" +#include "kudu/util/thread.h" +#include "kudu/util/threadlocal.h" +#include "kudu/util/threadlocal_cache.h" + +using std::string; +using std::unordered_set; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace threadlocal { + +class ThreadLocalTest : public KuduTest {}; + +const int kTargetCounterVal = 1000000; + +class Counter; +typedef unordered_set<Counter*> CounterPtrSet; +typedef Mutex RegistryLockType; +typedef simple_spinlock CounterLockType; + +// Registry to provide reader access to the thread-local Counters. +// The methods are only thread-safe if the calling thread holds the lock. +class CounterRegistry { + public: + CounterRegistry() { + } + + RegistryLockType* get_lock() const { + return &lock_; + } + + bool RegisterUnlocked(Counter* counter) { + LOG(INFO) << "Called RegisterUnlocked()"; + return InsertIfNotPresent(&counters_, counter); + } + + bool UnregisterUnlocked(Counter* counter) { + LOG(INFO) << "Called UnregisterUnlocked()"; + return counters_.erase(counter) > 0; + } + + CounterPtrSet* GetCountersUnlocked() { + return &counters_; + } + + private: + mutable RegistryLockType lock_; + CounterPtrSet counters_; + DISALLOW_COPY_AND_ASSIGN(CounterRegistry); +}; + +// A simple Counter class that registers itself with a CounterRegistry. +class Counter { + public: + Counter(CounterRegistry* registry, int val) + : tid_(Env::Default()->gettid()), + registry_(CHECK_NOTNULL(registry)), + val_(val) { + LOG(INFO) << "Counter::~Counter(): tid = " << tid_ << ", addr = " << this << ", val = " << val_; + std::lock_guard<RegistryLockType> reg_lock(*registry_->get_lock()); + CHECK(registry_->RegisterUnlocked(this)); + } + + ~Counter() { + LOG(INFO) << "Counter::~Counter(): tid = " << tid_ << ", addr = " << this << ", val = " << val_; + std::lock_guard<RegistryLockType> reg_lock(*registry_->get_lock()); + std::lock_guard<CounterLockType> self_lock(lock_); + LOG(INFO) << tid_ << ": deleting self from registry..."; + CHECK(registry_->UnregisterUnlocked(this)); + } + + uint64_t tid() { + return tid_; + } + + CounterLockType* get_lock() const { + return &lock_; + } + + void IncrementUnlocked() { + val_++; + } + + int GetValueUnlocked() { + return val_; + } + + private: + // We expect that most of the time this lock will be uncontended. + mutable CounterLockType lock_; + + // TID of thread that constructed this object. + const uint64_t tid_; + + // Register / unregister ourselves with this on construction / destruction. + CounterRegistry* const registry_; + + // Current value of the counter. + int val_; + + DISALLOW_COPY_AND_ASSIGN(Counter); +}; + +// Create a new THREAD_LOCAL Counter and loop an increment operation on it. +static void RegisterCounterAndLoopIncr(CounterRegistry* registry, + CountDownLatch* counters_ready, + CountDownLatch* reader_ready, + CountDownLatch* counters_done, + CountDownLatch* reader_done) { + BLOCK_STATIC_THREAD_LOCAL(Counter, counter, registry, 0); + // Inform the reader that we are alive. + counters_ready->CountDown(); + // Let the reader initialize before we start counting. + reader_ready->Wait(); + // Now rock & roll on the counting loop. + for (int i = 0; i < kTargetCounterVal; i++) { + std::lock_guard<CounterLockType> l(*counter->get_lock()); + counter->IncrementUnlocked(); + } + // Let the reader know we're ready for him to verify our counts. + counters_done->CountDown(); + // Wait until the reader is done before we exit the thread, which will call + // delete on the Counter. + reader_done->Wait(); +} + +// Iterate over the registered counters and their values. +static uint64_t Iterate(CounterRegistry* registry, int expected_counters) { + uint64_t sum = 0; + int seen_counters = 0; + std::lock_guard<RegistryLockType> l(*registry->get_lock()); + for (Counter* counter : *registry->GetCountersUnlocked()) { + uint64_t value; + { + std::lock_guard<CounterLockType> l(*counter->get_lock()); + value = counter->GetValueUnlocked(); + } + LOG(INFO) << "tid " << counter->tid() << " (counter " << counter << "): " << value; + sum += value; + seen_counters++; + } + CHECK_EQ(expected_counters, seen_counters); + return sum; +} + +static void TestThreadLocalCounters(CounterRegistry* registry, const int num_threads) { + LOG(INFO) << "Starting threads..."; + vector<scoped_refptr<kudu::Thread> > threads; + + CountDownLatch counters_ready(num_threads); + CountDownLatch reader_ready(1); + CountDownLatch counters_done(num_threads); + CountDownLatch reader_done(1); + for (int i = 0; i < num_threads; i++) { + scoped_refptr<kudu::Thread> new_thread; + CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i), + &RegisterCounterAndLoopIncr, registry, &counters_ready, &reader_ready, + &counters_done, &reader_done, &new_thread)); + threads.push_back(new_thread); + } + + // Wait for all threads to start and register their Counters. + counters_ready.Wait(); + CHECK_EQ(0, Iterate(registry, num_threads)); + LOG(INFO) << "--"; + + // Let the counters start spinning. + reader_ready.CountDown(); + + // Try to catch them in the act, just for kicks. + for (int i = 0; i < 2; i++) { + Iterate(registry, num_threads); + LOG(INFO) << "--"; + SleepFor(MonoDelta::FromMicroseconds(1)); + } + + // Wait until they're done and assure they sum up properly. + counters_done.Wait(); + LOG(INFO) << "Checking Counter sums..."; + CHECK_EQ(kTargetCounterVal * num_threads, Iterate(registry, num_threads)); + LOG(INFO) << "Counter sums add up!"; + reader_done.CountDown(); + + LOG(INFO) << "Joining & deleting threads..."; + for (scoped_refptr<kudu::Thread> thread : threads) { + CHECK_OK(ThreadJoiner(thread.get()).Join()); + } + LOG(INFO) << "Done."; +} + +TEST_F(ThreadLocalTest, TestConcurrentCounters) { + // Run this multiple times to ensure we don't leave remnants behind in the + // CounterRegistry. + CounterRegistry registry; + for (int i = 0; i < 3; i++) { + TestThreadLocalCounters(®istry, 8); + } +} + +// Test class that stores a string in a static thread local member. +// This class cannot be instantiated. The methods are all static. +class ThreadLocalString { + public: + static void set(std::string value); + static const std::string& get(); + private: + ThreadLocalString() { + } + DECLARE_STATIC_THREAD_LOCAL(std::string, value_); + DISALLOW_COPY_AND_ASSIGN(ThreadLocalString); +}; + +DEFINE_STATIC_THREAD_LOCAL(std::string, ThreadLocalString, value_); + +void ThreadLocalString::set(std::string value) { + INIT_STATIC_THREAD_LOCAL(std::string, value_); + *value_ = value; +} + +const std::string& ThreadLocalString::get() { + INIT_STATIC_THREAD_LOCAL(std::string, value_); + return *value_; +} + +static void RunAndAssign(CountDownLatch* writers_ready, + CountDownLatch *readers_ready, + CountDownLatch *all_done, + CountDownLatch *threads_exiting, + const std::string& in, + std::string* out) { + writers_ready->Wait(); + // Ensure it starts off as an empty string. + CHECK_EQ("", ThreadLocalString::get()); + ThreadLocalString::set(in); + + readers_ready->Wait(); + out->assign(ThreadLocalString::get()); + all_done->Wait(); + threads_exiting->CountDown(); +} + +TEST_F(ThreadLocalTest, TestTLSMember) { + const int num_threads = 8; + + vector<CountDownLatch*> writers_ready; + vector<CountDownLatch*> readers_ready; + vector<std::string*> out_strings; + vector<scoped_refptr<kudu::Thread> > threads; + + ElementDeleter writers_deleter(&writers_ready); + ElementDeleter readers_deleter(&readers_ready); + ElementDeleter out_strings_deleter(&out_strings); + + CountDownLatch all_done(1); + CountDownLatch threads_exiting(num_threads); + + LOG(INFO) << "Starting threads..."; + for (int i = 0; i < num_threads; i++) { + writers_ready.push_back(new CountDownLatch(1)); + readers_ready.push_back(new CountDownLatch(1)); + out_strings.push_back(new std::string()); + scoped_refptr<kudu::Thread> new_thread; + CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i), + &RunAndAssign, writers_ready[i], readers_ready[i], + &all_done, &threads_exiting, Substitute("$0", i), out_strings[i], &new_thread)); + threads.push_back(new_thread); + } + + // Unlatch the threads in order. + LOG(INFO) << "Writing to thread locals..."; + for (int i = 0; i < num_threads; i++) { + writers_ready[i]->CountDown(); + } + LOG(INFO) << "Reading from thread locals..."; + for (int i = 0; i < num_threads; i++) { + readers_ready[i]->CountDown(); + } + all_done.CountDown(); + // threads_exiting acts as a memory barrier. + threads_exiting.Wait(); + for (int i = 0; i < num_threads; i++) { + ASSERT_EQ(Substitute("$0", i), *out_strings[i]); + LOG(INFO) << "Read " << *out_strings[i]; + } + + LOG(INFO) << "Joining & deleting threads..."; + for (scoped_refptr<kudu::Thread> thread : threads) { + CHECK_OK(ThreadJoiner(thread.get()).Join()); + } +} + +TEST_F(ThreadLocalTest, TestThreadLocalCache) { + using TLC = ThreadLocalCache<int, string>; + TLC* tlc = TLC::GetInstance(); + + // Lookup in an empty cache should return nullptr. + ASSERT_EQ(nullptr, tlc->Lookup(0)); + + // Insert more items than the cache capacity. + const int kLastItem = TLC::kItemCapacity * 2; + for (int i = 1; i <= kLastItem ; i++) { + auto* item = tlc->EmplaceNew(i); + ASSERT_NE(nullptr, item); + *item = Substitute("item $0", i); + } + + // Looking up the most recent items should return them. + string* item = tlc->Lookup(kLastItem); + ASSERT_NE(nullptr, item); + EXPECT_EQ(*item, Substitute("item $0", kLastItem)); + + // Looking up evicted items should return nullptr. + ASSERT_EQ(nullptr, tlc->Lookup(1)); +} + +} // namespace threadlocal +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mutex.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/mutex.cc b/be/src/kudu/util/mutex.cc new file mode 100644 index 0000000..bd9539b --- /dev/null +++ b/be/src/kudu/util/mutex.cc @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Portions (c) 2011 The Chromium Authors. + +#include "kudu/util/mutex.h" + +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/debug-util.h" +#include "kudu/util/env.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/trace.h" + +using std::string; +using strings::Substitute; +using strings::SubstituteAndAppend; + +#ifndef NDEBUG +DEFINE_bool(debug_mutex_collect_stacktrace, false, + "Whether to collect a stacktrace on Mutex contention in a DEBUG build"); +TAG_FLAG(debug_mutex_collect_stacktrace, advanced); +TAG_FLAG(debug_mutex_collect_stacktrace, hidden); +#endif + +namespace kudu { + +Mutex::Mutex() +#ifndef NDEBUG + : owning_tid_(0), + stack_trace_(new StackTrace()) +#endif +{ +#ifndef NDEBUG + // In debug, setup attributes for lock error checking. + pthread_mutexattr_t mta; + int rv = pthread_mutexattr_init(&mta); + DCHECK_EQ(0, rv) << ". " << strerror(rv); + rv = pthread_mutexattr_settype(&mta, PTHREAD_MUTEX_ERRORCHECK); + DCHECK_EQ(0, rv) << ". " << strerror(rv); + rv = pthread_mutex_init(&native_handle_, &mta); + DCHECK_EQ(0, rv) << ". " << strerror(rv); + rv = pthread_mutexattr_destroy(&mta); + DCHECK_EQ(0, rv) << ". " << strerror(rv); +#else + // In release, go with the default lock attributes. + pthread_mutex_init(&native_handle_, NULL); +#endif +} + +Mutex::~Mutex() { + int rv = pthread_mutex_destroy(&native_handle_); + DCHECK_EQ(0, rv) << ". " << strerror(rv); +} + +bool Mutex::TryAcquire() { + int rv = pthread_mutex_trylock(&native_handle_); +#ifndef NDEBUG + DCHECK(rv == 0 || rv == EBUSY) << ". " << strerror(rv) << ". " << GetOwnerThreadInfo(); + if (rv == 0) { + CheckUnheldAndMark(); + } +#endif + return rv == 0; +} + +void Mutex::Acquire() { + // Optimize for the case when mutexes are uncontended. If they + // are contended, we'll have to go to sleep anyway, so the extra + // cost of branch mispredictions is moot. + // + // TryAcquire() is implemented as a simple CompareAndSwap inside + // pthreads so this does not require a system call. + if (PREDICT_TRUE(TryAcquire())) { + return; + } + + // If we weren't able to acquire the mutex immediately, then it's + // worth gathering timing information about the mutex acquisition. + MicrosecondsInt64 start_time = GetMonoTimeMicros(); + int rv = pthread_mutex_lock(&native_handle_); + DCHECK_EQ(0, rv) << ". " << strerror(rv) +#ifndef NDEBUG + << ". " << GetOwnerThreadInfo() +#endif + ; // NOLINT(whitespace/semicolon) + MicrosecondsInt64 end_time = GetMonoTimeMicros(); + + int64_t wait_time = end_time - start_time; + if (wait_time > 0) { + TRACE_COUNTER_INCREMENT("mutex_wait_us", wait_time); + } + +#ifndef NDEBUG + CheckUnheldAndMark(); +#endif +} + +void Mutex::Release() { +#ifndef NDEBUG + CheckHeldAndUnmark(); +#endif + int rv = pthread_mutex_unlock(&native_handle_); + DCHECK_EQ(0, rv) << ". " << strerror(rv); +} + +#ifndef NDEBUG +void Mutex::AssertAcquired() const { + DCHECK_EQ(Env::Default()->gettid(), owning_tid_); +} + +void Mutex::CheckHeldAndUnmark() { + AssertAcquired(); + owning_tid_ = 0; + if (FLAGS_debug_mutex_collect_stacktrace) { + stack_trace_->Reset(); + } +} + +void Mutex::CheckUnheldAndMark() { + DCHECK_EQ(0, owning_tid_); + owning_tid_ = Env::Default()->gettid(); + if (FLAGS_debug_mutex_collect_stacktrace) { + stack_trace_->Collect(); + } +} + +string Mutex::GetOwnerThreadInfo() const { + string str = Substitute("Owner tid: $0; Self tid: $1; ", owning_tid_, Env::Default()->gettid()); + if (FLAGS_debug_mutex_collect_stacktrace) { + SubstituteAndAppend(&str, "Owner stack:\n$0", stack_trace_->Symbolize()); + } else { + str += "To collect the owner stack trace, enable the flag --debug_mutex_collect_stacktrace"; + } + return str; +} + +#endif + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mutex.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/mutex.h b/be/src/kudu/util/mutex.h new file mode 100644 index 0000000..9277ac0 --- /dev/null +++ b/be/src/kudu/util/mutex.h @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_MUTEX_H +#define KUDU_UTIL_MUTEX_H + +#include <pthread.h> +#include <sys/types.h> + +#include <string> + +#include <glog/logging.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" + +namespace kudu { + +class StackTrace; + +// A lock built around pthread_mutex_t. Does not allow recursion. +// +// The following checks will be performed in DEBUG mode: +// Acquire(), TryAcquire() - the lock isn't already held. +// Release() - the lock is already held by this thread. +// +class Mutex { + public: + Mutex(); + ~Mutex(); + + void Acquire(); + void Release(); + bool TryAcquire(); + + void lock() { Acquire(); } + void unlock() { Release(); } + bool try_lock() { return TryAcquire(); } + +#ifndef NDEBUG + void AssertAcquired() const; +#else + void AssertAcquired() const {} +#endif + + private: + friend class ConditionVariable; + + pthread_mutex_t native_handle_; + +#ifndef NDEBUG + // Members and routines taking care of locks assertions. + void CheckHeldAndUnmark(); + void CheckUnheldAndMark(); + std::string GetOwnerThreadInfo() const; + + // All private data is implicitly protected by native_handle_. + // Be VERY careful to only access members under that lock. + pid_t owning_tid_; + gscoped_ptr<StackTrace> stack_trace_; +#endif + + DISALLOW_COPY_AND_ASSIGN(Mutex); +}; + +// A helper class that acquires the given Lock while the MutexLock is in scope. +class MutexLock { + public: + struct AlreadyAcquired {}; + + // Acquires 'lock' (must be unheld) and wraps around it. + // + // Sample usage: + // { + // MutexLock l(lock_); // acquired + // ... + // } // released + explicit MutexLock(Mutex& lock) + : lock_(&lock), + owned_(true) { + lock_->Acquire(); + } + + // Wraps around 'lock' (must already be held by this thread). + // + // Sample usage: + // { + // lock_.Acquire(); // acquired + // ... + // MutexLock l(lock_, AlreadyAcquired()); + // ... + // } // released + MutexLock(Mutex& lock, const AlreadyAcquired&) + : lock_(&lock), + owned_(true) { + lock_->AssertAcquired(); + } + + void Lock() { + DCHECK(!owned_); + lock_->Acquire(); + owned_ = true; + } + + void Unlock() { + DCHECK(owned_); + lock_->AssertAcquired(); + lock_->Release(); + owned_ = false; + } + + ~MutexLock() { + if (owned_) { + Unlock(); + } + } + + bool OwnsLock() const { + return owned_; + } + + private: + Mutex* lock_; + bool owned_; + DISALLOW_COPY_AND_ASSIGN(MutexLock); +}; + +} // namespace kudu +#endif /* KUDU_UTIL_MUTEX_H */
