http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/spinlock_profiling.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/spinlock_profiling.cc b/be/src/kudu/util/spinlock_profiling.cc new file mode 100644 index 0000000..001f8d5 --- /dev/null +++ b/be/src/kudu/util/spinlock_profiling.cc @@ -0,0 +1,348 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/spinlock_profiling.h" + +#include <sstream> + +#include <glog/logging.h> +#include <gflags/gflags.h> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/basictypes.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/spinlock.h" +#include "kudu/gutil/strings/human_readable.h" +#include "kudu/gutil/sysinfo.h" +#include "kudu/util/debug-util.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/metrics.h" +#include "kudu/util/striped64.h" +#include "kudu/util/trace.h" + +DEFINE_int32(lock_contention_trace_threshold_cycles, + 2000000, // 2M cycles should be about 1ms + "If acquiring a spinlock takes more than this number of " + "cycles, and a Trace is currently active, then the current " + "stack trace is logged to the trace buffer."); +TAG_FLAG(lock_contention_trace_threshold_cycles, hidden); + +METRIC_DEFINE_gauge_uint64(server, spinlock_contention_time, + "Spinlock Contention Time", kudu::MetricUnit::kMicroseconds, + "Amount of time consumed by contention on internal spinlocks since the server " + "started. If this increases rapidly, it may indicate a performance issue in Kudu " + "internals triggered by a particular workload and warrant investigation.", + kudu::EXPOSE_AS_COUNTER); +METRIC_DEFINE_gauge_uint64(server, tcmalloc_contention_time, + "TCMalloc Contention Time", kudu::MetricUnit::kMicroseconds, + "Amount of time consumed by contention on tcmalloc's locks since the server " + "started. If this increases rapidly, it may indicate a performance issue in Kudu " + "internals triggered by a particular workload and warrant investigation.", + kudu::EXPOSE_AS_COUNTER); + + +using base::SpinLock; +using base::SpinLockHolder; + +namespace kudu { + +static const double kMicrosPerSecond = 1000000.0; + +static LongAdder* g_contended_cycles = nullptr; + +namespace { + +// We can't use LongAdder for tcmalloc contention, because its +// implementation can allocate memory, and doing so is prohibited +// in the tcmalloc contention callback. +// +// We pad and align this struct to two cachelines to avoid any false +// sharing with the g_contended_cycles counter or any other globals. +struct PaddedAtomic64 { + Atomic64 val; + char padding[CACHELINE_SIZE * 2 - sizeof(Atomic64)]; +} CACHELINE_ALIGNED; +static PaddedAtomic64 g_tcmalloc_contention; + +// Implements a very simple linear-probing hashtable of stack traces with +// a fixed number of entries. +// +// Threads experiencing contention record their stacks into this hashtable, +// or increment an already-existing entry. Each entry has its own lock, +// but we can "skip" an entry under contention, and spread out a single stack +// into multiple buckets if necessary. +// +// A thread collecting a profile collects stack traces out of the hash table +// and resets the counts to 0 as they are collected. +class ContentionStacks { + public: + ContentionStacks() + : dropped_samples_(0) { + } + + // Add a stack trace to the table. + void AddStack(const StackTrace& s, int64_t cycles); + + // Flush stacks from the buffer to 'out'. See the docs for FlushSynchronizationProfile() + // in spinlock_profiling.h for details on format. + // + // On return, guarantees that any stack traces that were present at the beginning of + // the call have been flushed. However, new stacks can be added concurrently with this call. + void Flush(std::ostringstream* out, int64_t* dropped); + + private: + + // Collect the next sample from the underlying buffer, and set it back to 0 count + // (thus marking it as "empty"). + // + // 'iterator' serves as a way to keep track of the current position in the buffer. + // Callers should initially set it to 0, and then pass the same pointer to each + // call to CollectSample. This serves to loop through the collected samples. + bool CollectSample(uint64_t* iterator, StackTrace* s, int64_t* trip_count, int64_t* cycles); + + // Hashtable entry. + struct Entry { + Entry() : trip_count(0), + cycle_count(0) { + } + + // Protects all other entries. + SpinLock lock; + + // The number of times we've experienced contention with a stack trace equal + // to 'trace'. + // + // If this is 0, then the entry is "unclaimed" and the other fields are not + // considered valid. + int64_t trip_count; + + // The total number of cycles spent waiting at this stack trace. + int64_t cycle_count; + + // A cached hashcode of the trace. + uint64_t hash; + + // The actual stack trace. + StackTrace trace; + }; + + enum { + kNumEntries = 1024, + kNumLinearProbeAttempts = 4 + }; + Entry entries_[kNumEntries]; + + // The number of samples which were dropped due to contention on this structure or + // due to the hashtable being too full. + AtomicInt<int64_t> dropped_samples_; +}; + +Atomic32 g_profiling_enabled = 0; +ContentionStacks* g_contention_stacks = nullptr; + +void ContentionStacks::AddStack(const StackTrace& s, int64_t cycles) { + uint64_t hash = s.HashCode(); + + // Linear probe up to 4 attempts before giving up + for (int i = 0; i < kNumLinearProbeAttempts; i++) { + Entry* e = &entries_[(hash + i) % kNumEntries]; + if (!e->lock.TryLock()) { + // If we fail to lock it, we can safely just use a different slot. + // It's OK if a single stack shows up multiple times, because pprof + // aggregates them in the end anyway. + continue; + } + + if (e->trip_count == 0) { + // It's an un-claimed slot. Claim it. + e->hash = hash; + e->trace.CopyFrom(s); + } else if (e->hash != hash || !e->trace.Equals(s)) { + // It's claimed by a different stack trace. + e->lock.Unlock(); + continue; + } + + // Contribute to the stats for this stack. + e->cycle_count += cycles; + e->trip_count++; + e->lock.Unlock(); + return; + } + + // If we failed to find a matching hashtable slot, or we hit lock contention + // trying to record our sample, add it to the dropped sample count. + dropped_samples_.Increment(); +} + +void ContentionStacks::Flush(std::ostringstream* out, int64_t* dropped) { + uint64_t iterator = 0; + StackTrace t; + int64_t cycles; + int64_t count; + while (g_contention_stacks->CollectSample(&iterator, &t, &count, &cycles)) { + *out << cycles << "\t" << count + << " @ " << t.ToHexString(StackTrace::NO_FIX_CALLER_ADDRESSES) + << std::endl; + } + + *dropped += dropped_samples_.Exchange(0); +} + +bool ContentionStacks::CollectSample(uint64_t* iterator, StackTrace* s, int64_t* trip_count, + int64_t* cycles) { + while (*iterator < kNumEntries) { + Entry* e = &entries_[(*iterator)++]; + SpinLockHolder l(&e->lock); + if (e->trip_count == 0) continue; + + *trip_count = e->trip_count; + *cycles = e->cycle_count; + s->CopyFrom(e->trace); + + e->trip_count = 0; + e->cycle_count = 0; + return true; + } + + // Looped through the whole array and found nothing. + return false; +} + + +void SubmitSpinLockProfileData(const void *contendedlock, int64 wait_cycles) { + TRACE_COUNTER_INCREMENT("spinlock_wait_cycles", wait_cycles); + bool profiling_enabled = base::subtle::Acquire_Load(&g_profiling_enabled); + bool long_wait_time = wait_cycles > FLAGS_lock_contention_trace_threshold_cycles; + // Short circuit this function quickly in the common case. + if (PREDICT_TRUE(!profiling_enabled && !long_wait_time)) { + return; + } + + static __thread bool in_func = false; + if (in_func) return; // non-re-entrant + in_func = true; + + StackTrace stack; + stack.Collect(); + + if (profiling_enabled) { + DCHECK_NOTNULL(g_contention_stacks)->AddStack(stack, wait_cycles); + } + + if (PREDICT_FALSE(long_wait_time)) { + Trace* t = Trace::CurrentTrace(); + if (t) { + double seconds = static_cast<double>(wait_cycles) / base::CyclesPerSecond(); + char backtrace_buffer[1024]; + stack.StringifyToHex(backtrace_buffer, arraysize(backtrace_buffer)); + TRACE_TO(t, "Waited $0 on lock $1. stack: $2", + HumanReadableElapsedTime::ToShortString(seconds), contendedlock, + backtrace_buffer); + } + } + + LongAdder* la = reinterpret_cast<LongAdder*>( + base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(&g_contended_cycles))); + if (la) { + la->IncrementBy(wait_cycles); + } + + in_func = false; +} + +void DoInit() { + base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&g_contention_stacks), + reinterpret_cast<uintptr_t>(new ContentionStacks())); + base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&g_contended_cycles), + reinterpret_cast<uintptr_t>(new LongAdder())); +} + +} // anonymous namespace + +void InitSpinLockContentionProfiling() { + static GoogleOnceType once = GOOGLE_ONCE_INIT; + GoogleOnceInit(&once, DoInit); +} + + +void RegisterSpinLockContentionMetrics(const scoped_refptr<MetricEntity>& entity) { + InitSpinLockContentionProfiling(); + entity->NeverRetire( + METRIC_spinlock_contention_time.InstantiateFunctionGauge( + entity, Bind(&GetSpinLockContentionMicros))); + entity->NeverRetire( + METRIC_tcmalloc_contention_time.InstantiateFunctionGauge( + entity, Bind(&GetTcmallocContentionMicros))); + +} + +uint64_t GetSpinLockContentionMicros() { + int64_t wait_cycles = DCHECK_NOTNULL(g_contended_cycles)->Value(); + double micros = static_cast<double>(wait_cycles) / base::CyclesPerSecond() + * kMicrosPerSecond; + return implicit_cast<int64_t>(micros); +} + +uint64_t GetTcmallocContentionMicros() { + int64_t wait_cycles = base::subtle::NoBarrier_Load(&g_tcmalloc_contention.val); + double micros = static_cast<double>(wait_cycles) / base::CyclesPerSecond() + * kMicrosPerSecond; + return implicit_cast<int64_t>(micros); +} + +void StartSynchronizationProfiling() { + InitSpinLockContentionProfiling(); + base::subtle::Barrier_AtomicIncrement(&g_profiling_enabled, 1); +} + +void FlushSynchronizationProfile(std::ostringstream* out, + int64_t* drop_count) { + CHECK_NOTNULL(g_contention_stacks)->Flush(out, drop_count); +} + +void StopSynchronizationProfiling() { + InitSpinLockContentionProfiling(); + CHECK_GE(base::subtle::Barrier_AtomicIncrement(&g_profiling_enabled, -1), 0); +} + +} // namespace kudu + +// The hook expected by gutil is in the gutil namespace. Simply forward into the +// kudu namespace so we don't need to qualify everything. +namespace gutil { +void SubmitSpinLockProfileData(const void *contendedlock, int64 wait_cycles) { + kudu::SubmitSpinLockProfileData(contendedlock, wait_cycles); +} +} // namespace gutil + +// tcmalloc's internal spinlocks also support submitting contention metrics +// using the base::SubmitSpinLockProfileData weak symbol. However, this function might be +// called while tcmalloc's heap lock is held. Thus, we cannot allocate memory here or else +// we risk a deadlock. So, this implementation just does the bare minimum to expose +// tcmalloc contention. +namespace base { +void SubmitSpinLockProfileData(const void* contendedlock, int64 wait_cycles) { +#if !defined(__APPLE__) + auto t = kudu::Trace::CurrentTrace(); + if (t) { + t->metrics()->IncrementTcmallocContentionCycles(wait_cycles); + } +#endif // !defined(__APPLE__) + base::subtle::NoBarrier_AtomicIncrement(&kudu::g_tcmalloc_contention.val, wait_cycles); +} +} // namespace base
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/spinlock_profiling.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/spinlock_profiling.h b/be/src/kudu/util/spinlock_profiling.h new file mode 100644 index 0000000..d5b5f15 --- /dev/null +++ b/be/src/kudu/util/spinlock_profiling.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_SPINLOCK_PROFILING_H +#define KUDU_UTIL_SPINLOCK_PROFILING_H + +#include <iosfwd> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" + +namespace kudu { + +class MetricEntity; + +// Enable instrumentation of spinlock contention. +// +// Calling this method currently does nothing, except for ensuring +// that the spinlock_profiling.cc object file gets linked into your +// executable. It needs to be somewhere reachable in your code, +// just so that gcc doesn't omit the underlying module from the binary. +void InitSpinLockContentionProfiling(); + +// Return the total number of microseconds spent in spinlock contention +// since the server started. +uint64_t GetSpinLockContentionMicros(); + +// Return the total number of microseconds spent in tcmalloc contention +// since the server started. +uint64_t GetTcmallocContentionMicros(); + +// Register metrics in the given server entity which measure the amount of +// spinlock contention. +void RegisterSpinLockContentionMetrics(const scoped_refptr<MetricEntity>& entity); + +// Enable process-wide synchronization profiling. +// +// While profiling is enabled, spinlock contention will be recorded in a buffer. +// The caller should periodically call FlushSynchronizationProfile() to empty +// the buffer, or else profiles may be dropped. +void StartSynchronizationProfiling(); + +// Flush the current buffer of contention profile samples to the given stream. +// +// Each stack trace that has been observed results in at least one line of the +// following format: +// <cycles> <trip count> @ <hex stack trace> +// +// Flushing the data also clears the current buffer of trace samples. +// This may be called while synchronization profiling is enabled or after it has +// been disabled. +// +// *dropped_samples will be incremented by the number of samples which were dropped +// due to the contention buffer overflowing. If profiling is enabled during this +// call, then the 'drop_count' may be slightly out-of-date with respect to the +// returned samples. +void FlushSynchronizationProfile(std::ostringstream* out, int64_t* drop_count); + +// Stop collecting contention profiles. +void StopSynchronizationProfiling(); + +} // namespace kudu +#endif /* KUDU_UTIL_SPINLOCK_PROFILING_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/stack_watchdog-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/stack_watchdog-test.cc b/be/src/kudu/util/stack_watchdog-test.cc new file mode 100644 index 0000000..9bbb097 --- /dev/null +++ b/be/src/kudu/util/stack_watchdog-test.cc @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/kernel_stack_watchdog.h" + +#include <gflags/gflags.h> +#include <string> +#include <vector> + +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_util.h" + +using std::string; +using std::vector; +using strings::Substitute; + +DECLARE_int32(hung_task_check_interval_ms); + +namespace kudu { + +class StackWatchdogTest : public KuduTest { + public: + virtual void SetUp() OVERRIDE { + KuduTest::SetUp(); + KernelStackWatchdog::GetInstance()->SaveLogsForTests(true); + FLAGS_hung_task_check_interval_ms = 10; + } +}; + +// The KernelStackWatchdog is only enabled on Linux, since we can't get kernel +// stack traces on other platforms. +#if defined(__linux__) +TEST_F(StackWatchdogTest, TestWatchdog) { + vector<string> log; + { + SCOPED_WATCH_STACK(20); + for (int i = 0; i < 50; i++) { + SleepFor(MonoDelta::FromMilliseconds(100)); + log = KernelStackWatchdog::GetInstance()->LoggedMessagesForTests(); + // Wait for several samples, since it's possible that we get unlucky + // and the watchdog sees us just before or after a sleep. + if (log.size() > 5) { + break; + } + } + } + string s = JoinStrings(log, "\n"); + ASSERT_STR_CONTAINS(s, "TestWatchdog_Test::TestBody()"); + ASSERT_STR_CONTAINS(s, "nanosleep"); +} +#endif + +// Test that SCOPED_WATCH_STACK scopes can be nested. +TEST_F(StackWatchdogTest, TestNestedScopes) { + vector<string> log; + int line1; + int line2; + { + SCOPED_WATCH_STACK(20); line1 = __LINE__; + { + SCOPED_WATCH_STACK(20); line2 = __LINE__; + for (int i = 0; i < 50; i++) { + SleepFor(MonoDelta::FromMilliseconds(100)); + log = KernelStackWatchdog::GetInstance()->LoggedMessagesForTests(); + if (log.size() > 3) { + break; + } + } + } + } + + // Verify that both nested scopes were collected. + string s = JoinStrings(log, "\n"); + ASSERT_STR_CONTAINS(s, Substitute("stack_watchdog-test.cc:$0", line1)); + ASSERT_STR_CONTAINS(s, Substitute("stack_watchdog-test.cc:$0", line2)); +} + +TEST_F(StackWatchdogTest, TestPerformance) { + // Reset the check interval to be reasonable. Otherwise the benchmark + // wastes a lot of CPU running the watchdog thread too often. + FLAGS_hung_task_check_interval_ms = 500; + LOG_TIMING(INFO, "1M SCOPED_WATCH_STACK()s") { + for (int i = 0; i < 1000000; i++) { + SCOPED_WATCH_STACK(100); + } + } +} +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/status-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/status-test.cc b/be/src/kudu/util/status-test.cc new file mode 100644 index 0000000..ca33b89 --- /dev/null +++ b/be/src/kudu/util/status-test.cc @@ -0,0 +1,98 @@ +// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include <errno.h> +#include <vector> +#include "kudu/util/status.h" +#include "kudu/util/test_util.h" + +using std::string; + +namespace kudu { + +TEST(StatusTest, TestPosixCode) { + Status ok = Status::OK(); + ASSERT_EQ(0, ok.posix_code()); + Status file_error = Status::IOError("file error", Slice(), ENOTDIR); + ASSERT_EQ(ENOTDIR, file_error.posix_code()); +} + +TEST(StatusTest, TestToString) { + Status file_error = Status::IOError("file error", Slice(), ENOTDIR); + ASSERT_EQ(string("IO error: file error (error 20)"), file_error.ToString()); +} + +TEST(StatusTest, TestClonePrepend) { + Status file_error = Status::IOError("file error", "msg2", ENOTDIR); + Status appended = file_error.CloneAndPrepend("Heading"); + ASSERT_EQ(string("IO error: Heading: file error: msg2 (error 20)"), appended.ToString()); +} + +TEST(StatusTest, TestCloneAppend) { + Status remote_error = Status::RemoteError("Application error"); + Status appended = remote_error.CloneAndAppend(Status::NotFound("Unknown tablet").ToString()); + ASSERT_EQ(string("Remote error: Application error: Not found: Unknown tablet"), + appended.ToString()); +} + +TEST(StatusTest, TestMemoryUsage) { + ASSERT_EQ(0, Status::OK().memory_footprint_excluding_this()); + ASSERT_GT(Status::IOError( + "file error", "some other thing", ENOTDIR).memory_footprint_excluding_this(), 0); +} + +TEST(StatusTest, TestMoveConstructor) { + // OK->OK move should do nothing. + { + Status src = Status::OK(); + Status dst = std::move(src); + ASSERT_OK(src); + ASSERT_OK(dst); + } + + // Moving a not-OK status into a new one should make the moved status + // "OK". + { + Status src = Status::NotFound("foo"); + Status dst = std::move(src); + ASSERT_OK(src); + ASSERT_EQ("Not found: foo", dst.ToString()); + } +} + +TEST(StatusTest, TestMoveAssignment) { + // OK->Bad move should clear the source status and also make the + // destination status OK. + { + Status src = Status::OK(); + Status dst = Status::NotFound("orig dst"); + dst = std::move(src); + ASSERT_OK(src); + ASSERT_OK(dst); + } + + // Bad->Bad move. + { + Status src = Status::NotFound("orig src"); + Status dst = Status::NotFound("orig dst"); + dst = std::move(src); + ASSERT_OK(src); + ASSERT_EQ("Not found: orig src", dst.ToString()); + } + + // Bad->OK move + { + Status src = Status::NotFound("orig src"); + Status dst = Status::OK(); + dst = std::move(src); + ASSERT_OK(src); + ASSERT_EQ("Not found: orig src", dst.ToString()); + } +} + + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/status.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/status.cc b/be/src/kudu/util/status.cc new file mode 100644 index 0000000..9f88da1 --- /dev/null +++ b/be/src/kudu/util/status.cc @@ -0,0 +1,162 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "kudu/util/status.h" + +#include <stdio.h> +#include <stdint.h> + +#include "kudu/gutil/strings/fastmem.h" +#include "kudu/util/malloc.h" + +namespace kudu { + +const char* Status::CopyState(const char* state) { + uint32_t size; + strings::memcpy_inlined(&size, state, sizeof(size)); + auto result = new char[size + 7]; + strings::memcpy_inlined(result, state, size + 7); + return result; +} + +Status::Status(Code code, const Slice& msg, const Slice& msg2, + int16_t posix_code) { + DCHECK(code != kOk); + const uint32_t len1 = msg.size(); + const uint32_t len2 = msg2.size(); + const uint32_t size = len1 + (len2 ? (2 + len2) : 0); + auto result = new char[size + 7]; + memcpy(result, &size, sizeof(size)); + result[4] = static_cast<char>(code); + memcpy(result + 5, &posix_code, sizeof(posix_code)); + memcpy(result + 7, msg.data(), len1); + if (len2) { + result[7 + len1] = ':'; + result[8 + len1] = ' '; + memcpy(result + 9 + len1, msg2.data(), len2); + } + state_ = result; +} + +std::string Status::CodeAsString() const { + if (state_ == nullptr) { + return "OK"; + } + + const char* type; + switch (code()) { + case kOk: + type = "OK"; + break; + case kNotFound: + type = "Not found"; + break; + case kCorruption: + type = "Corruption"; + break; + case kNotSupported: + type = "Not implemented"; + break; + case kInvalidArgument: + type = "Invalid argument"; + break; + case kIOError: + type = "IO error"; + break; + case kAlreadyPresent: + type = "Already present"; + break; + case kRuntimeError: + type = "Runtime error"; + break; + case kNetworkError: + type = "Network error"; + break; + case kIllegalState: + type = "Illegal state"; + break; + case kNotAuthorized: + type = "Not authorized"; + break; + case kAborted: + type = "Aborted"; + break; + case kRemoteError: + type = "Remote error"; + break; + case kServiceUnavailable: + type = "Service unavailable"; + break; + case kTimedOut: + type = "Timed out"; + break; + case kUninitialized: + type = "Uninitialized"; + break; + case kConfigurationError: + type = "Configuration error"; + break; + case kIncomplete: + type = "Incomplete"; + break; + case kEndOfFile: + type = "End of file"; + break; + } + return std::string(type); +} + +std::string Status::ToString() const { + std::string result(CodeAsString()); + if (state_ == nullptr) { + return result; + } + + result.append(": "); + Slice msg = message(); + result.append(reinterpret_cast<const char*>(msg.data()), msg.size()); + int16_t posix = posix_code(); + if (posix != -1) { + char buf[64]; + snprintf(buf, sizeof(buf), " (error %d)", posix); + result.append(buf); + } + return result; +} + +Slice Status::message() const { + if (state_ == nullptr) { + return Slice(); + } + + uint32_t length; + memcpy(&length, state_, sizeof(length)); + return Slice(state_ + 7, length); +} + +int16_t Status::posix_code() const { + if (state_ == nullptr) { + return 0; + } + int16_t posix_code; + memcpy(&posix_code, state_ + 5, sizeof(posix_code)); + return posix_code; +} + +Status Status::CloneAndPrepend(const Slice& msg) const { + return Status(code(), msg, message(), posix_code()); +} + +Status Status::CloneAndAppend(const Slice& msg) const { + return Status(code(), message(), msg, posix_code()); +} + +size_t Status::memory_footprint_excluding_this() const { + return state_ ? kudu_malloc_usable_size(state_) : 0; +} + +size_t Status::memory_footprint_including_this() const { + return kudu_malloc_usable_size(this) + memory_footprint_excluding_this(); +} +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/status.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/status.h b/be/src/kudu/util/status.h new file mode 100644 index 0000000..93a25a6 --- /dev/null +++ b/be/src/kudu/util/status.h @@ -0,0 +1,433 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// A Status encapsulates the result of an operation. It may indicate success, +// or it may indicate an error with an associated error message. +// +// Multiple threads can invoke const methods on a Status without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same Status must use +// external synchronization. + +#ifndef KUDU_UTIL_STATUS_H_ +#define KUDU_UTIL_STATUS_H_ + +#include <stdint.h> +#include <string> + +#ifdef KUDU_HEADERS_NO_STUBS +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#else +#include "kudu/client/stubs.h" +#endif + +#include "kudu/util/kudu_export.h" +#include "kudu/util/slice.h" + +/// @brief Return the given status if it is not @c OK. +#define KUDU_RETURN_NOT_OK(s) do { \ + const ::kudu::Status& _s = (s); \ + if (PREDICT_FALSE(!_s.ok())) return _s; \ + } while (0); + +/// @brief Return the given status if it is not OK, but first clone it and +/// prepend the given message. +#define KUDU_RETURN_NOT_OK_PREPEND(s, msg) do { \ + const ::kudu::Status& _s = (s); \ + if (PREDICT_FALSE(!_s.ok())) return _s.CloneAndPrepend(msg); \ + } while (0); + +/// @brief Return @c to_return if @c to_call returns a bad status. +/// The substitution for 'to_return' may reference the variable +/// @c s for the bad status. +#define KUDU_RETURN_NOT_OK_RET(to_call, to_return) do { \ + const ::kudu::Status& s = (to_call); \ + if (PREDICT_FALSE(!s.ok())) return (to_return); \ + } while (0); + +/// @brief Emit a warning if @c to_call returns a bad status. +#define KUDU_WARN_NOT_OK(to_call, warning_prefix) do { \ + const ::kudu::Status& _s = (to_call); \ + if (PREDICT_FALSE(!_s.ok())) { \ + KUDU_LOG(WARNING) << (warning_prefix) << ": " << _s.ToString(); \ + } \ + } while (0); + +/// @brief Log the given status and return immediately. +#define KUDU_LOG_AND_RETURN(level, status) do { \ + const ::kudu::Status& _s = (status); \ + KUDU_LOG(level) << _s.ToString(); \ + return _s; \ + } while (0); + +/// @brief If the given status is not OK, log it and 'msg' at 'level' and return the status. +#define KUDU_RETURN_NOT_OK_LOG(s, level, msg) do { \ + const ::kudu::Status& _s = (s); \ + if (PREDICT_FALSE(!_s.ok())) { \ + KUDU_LOG(level) << "Status: " << _s.ToString() << " " << (msg); \ + return _s; \ + } \ + } while (0); + +/// @brief If @c to_call returns a bad status, CHECK immediately with +/// a logged message of @c msg followed by the status. +#define KUDU_CHECK_OK_PREPEND(to_call, msg) do { \ + const ::kudu::Status& _s = (to_call); \ + KUDU_CHECK(_s.ok()) << (msg) << ": " << _s.ToString(); \ + } while (0); + +/// @brief If the status is bad, CHECK immediately, appending the status to the +/// logged message. +#define KUDU_CHECK_OK(s) KUDU_CHECK_OK_PREPEND(s, "Bad status") + +/// @brief If @c to_call returns a bad status, DCHECK immediately with +/// a logged message of @c msg followed by the status. +#define KUDU_DCHECK_OK_PREPEND(to_call, msg) do { \ + const ::kudu::Status& _s = (to_call); \ + KUDU_DCHECK(_s.ok()) << (msg) << ": " << _s.ToString(); \ + } while (0); + +/// @brief If the status is bad, DCHECK immediately, appending the status to the +/// logged 'Bad status' message. +#define KUDU_DCHECK_OK(s) KUDU_DCHECK_OK_PREPEND(s, "Bad status") + +/// @file status.h +/// +/// This header is used in both the Kudu build as well as in builds of +/// applications that use the Kudu C++ client. In the latter we need to be +/// careful to "namespace" our macros, to avoid colliding or overriding with +/// similarly named macros belonging to the application. +/// +/// KUDU_HEADERS_USE_SHORT_STATUS_MACROS handles this behavioral change. When +/// defined, we're building Kudu and: +/// @li Non-namespaced macros are allowed and mapped to the namespaced versions +/// defined above. +/// @li Namespaced versions of glog macros are mapped to the real glog macros +/// (otherwise the macros are defined in the C++ client stubs). +#ifdef KUDU_HEADERS_USE_SHORT_STATUS_MACROS +#define RETURN_NOT_OK KUDU_RETURN_NOT_OK +#define RETURN_NOT_OK_PREPEND KUDU_RETURN_NOT_OK_PREPEND +#define RETURN_NOT_OK_RET KUDU_RETURN_NOT_OK_RET +#define WARN_NOT_OK KUDU_WARN_NOT_OK +#define LOG_AND_RETURN KUDU_LOG_AND_RETURN +#define RETURN_NOT_OK_LOG KUDU_RETURN_NOT_OK_LOG +#define CHECK_OK_PREPEND KUDU_CHECK_OK_PREPEND +#define CHECK_OK KUDU_CHECK_OK +#define DCHECK_OK_PREPEND KUDU_DCHECK_OK_PREPEND +#define DCHECK_OK KUDU_DCHECK_OK + +// These are standard glog macros. +#define KUDU_LOG LOG +#define KUDU_CHECK CHECK +#define KUDU_DCHECK DCHECK +#endif + +namespace kudu { + +/// @brief A representation of an operation's outcome. +class KUDU_EXPORT Status { + public: + /// Create an object representing success status. + Status() : state_(NULL) { } + + ~Status() { delete[] state_; } + + /// Copy the specified status. + /// + /// @param [in] s + /// The status object to copy from. + Status(const Status& s); + + /// Assign the specified status. + /// + /// @param [in] s + /// The status object to assign from. + /// @return The reference to the modified object. + Status& operator=(const Status& s); + +#if __cplusplus >= 201103L + /// Move the specified status (C++11). + /// + /// @param [in] s + /// rvalue reference to a Status object. + Status(Status&& s); + + /// Assign the specified status using move semantics (C++11). + /// + /// @param [in] s + /// rvalue reference to a Status object. + /// @return The reference to the modified object. + Status& operator=(Status&& s); +#endif + + /// @return A success status. + static Status OK() { return Status(); } + + + /// @name Methods to build status objects for various types of errors. + /// + /// @param [in] msg + /// The informational message on the error. + /// @param [in] msg2 + /// Additional information on the error (optional). + /// @param [in] posix_code + /// POSIX error code, if applicable (optional). + /// @return The error status of an appropriate type. + /// + ///@{ + static Status NotFound(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kNotFound, msg, msg2, posix_code); + } + static Status Corruption(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kCorruption, msg, msg2, posix_code); + } + static Status NotSupported(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kNotSupported, msg, msg2, posix_code); + } + static Status InvalidArgument(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kInvalidArgument, msg, msg2, posix_code); + } + static Status IOError(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kIOError, msg, msg2, posix_code); + } + static Status AlreadyPresent(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kAlreadyPresent, msg, msg2, posix_code); + } + static Status RuntimeError(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kRuntimeError, msg, msg2, posix_code); + } + static Status NetworkError(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kNetworkError, msg, msg2, posix_code); + } + static Status IllegalState(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kIllegalState, msg, msg2, posix_code); + } + static Status NotAuthorized(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kNotAuthorized, msg, msg2, posix_code); + } + static Status Aborted(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kAborted, msg, msg2, posix_code); + } + static Status RemoteError(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kRemoteError, msg, msg2, posix_code); + } + static Status ServiceUnavailable(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kServiceUnavailable, msg, msg2, posix_code); + } + static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kTimedOut, msg, msg2, posix_code); + } + static Status Uninitialized(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kUninitialized, msg, msg2, posix_code); + } + static Status ConfigurationError(const Slice& msg, const Slice& msg2 = Slice(), + int16_t posix_code = -1) { + return Status(kConfigurationError, msg, msg2, posix_code); + } + static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice(), + int64_t posix_code = -1) { + return Status(kIncomplete, msg, msg2, posix_code); + } + static Status EndOfFile(const Slice& msg, const Slice& msg2 = Slice(), + int64_t posix_code = -1) { + return Status(kEndOfFile, msg, msg2, posix_code); + } + ///@} + + /// @return @c true iff the status indicates success. + bool ok() const { return (state_ == NULL); } + + /// @return @c true iff the status indicates a NotFound error. + bool IsNotFound() const { return code() == kNotFound; } + + /// @return @c true iff the status indicates a Corruption error. + bool IsCorruption() const { return code() == kCorruption; } + + /// @return @c true iff the status indicates a NotSupported error. + bool IsNotSupported() const { return code() == kNotSupported; } + + /// @return @c true iff the status indicates an IOError. + bool IsIOError() const { return code() == kIOError; } + + /// @return @c true iff the status indicates an InvalidArgument error. + bool IsInvalidArgument() const { return code() == kInvalidArgument; } + + /// @return @c true iff the status indicates an AlreadyPresent error. + bool IsAlreadyPresent() const { return code() == kAlreadyPresent; } + + /// @return @c true iff the status indicates a RuntimeError. + bool IsRuntimeError() const { return code() == kRuntimeError; } + + /// @return @c true iff the status indicates a NetworkError. + bool IsNetworkError() const { return code() == kNetworkError; } + + /// @return @c true iff the status indicates an IllegalState error. + bool IsIllegalState() const { return code() == kIllegalState; } + + /// @return @c true iff the status indicates a NotAuthorized error. + bool IsNotAuthorized() const { return code() == kNotAuthorized; } + + /// @return @c true iff the status indicates an Aborted error. + bool IsAborted() const { return code() == kAborted; } + + /// @return @c true iff the status indicates a RemoteError. + bool IsRemoteError() const { return code() == kRemoteError; } + + /// @return @c true iff the status indicates ServiceUnavailable. + bool IsServiceUnavailable() const { return code() == kServiceUnavailable; } + + /// @return @c true iff the status indicates TimedOut. + bool IsTimedOut() const { return code() == kTimedOut; } + + /// @return @c true iff the status indicates Uninitialized. + bool IsUninitialized() const { return code() == kUninitialized; } + + /// @return @c true iff the status indicates ConfigurationError. + bool IsConfigurationError() const { return code() == kConfigurationError; } + + /// @return @c true iff the status indicates Incomplete. + bool IsIncomplete() const { return code() == kIncomplete; } + + /// @return @c true iff the status indicates end of file. + bool IsEndOfFile() const { return code() == kEndOfFile; } + + /// @return A string representation of this status suitable for printing. + /// Returns the string "OK" for success. + std::string ToString() const; + + /// @return A string representation of the status code, without the message + /// text or POSIX code information. + std::string CodeAsString() const; + + /// This is similar to ToString, except that it does not include + /// the stringified error code or POSIX code. + /// + /// @note The returned Slice is only valid as long as this Status object + /// remains live and unchanged. + /// + /// @return The message portion of the Status. For @c OK statuses, + /// this returns an empty string. + Slice message() const; + + /// @return The POSIX code associated with this Status object, + /// or @c -1 if there is none. + int16_t posix_code() const; + + /// Clone the object and add the specified prefix to the clone's message. + /// + /// @param [in] msg + /// The message to prepend. + /// @return A new Status object with the same state plus an additional + /// leading message. + Status CloneAndPrepend(const Slice& msg) const; + + /// Clone the object and add the specified suffix to the clone's message. + /// + /// @param [in] msg + /// The message to append. + /// @return A new Status object with the same state plus an additional + /// trailing message. + Status CloneAndAppend(const Slice& msg) const; + + /// @return The memory usage of this object without the object itself. + /// Should be used when embedded inside another object. + size_t memory_footprint_excluding_this() const; + + /// @return The memory usage of this object including the object itself. + /// Should be used when allocated on the heap. + size_t memory_footprint_including_this() const; + + private: + // OK status has a NULL state_. Otherwise, state_ is a new[] array + // of the following form: + // state_[0..3] == length of message + // state_[4] == code + // state_[5..6] == posix_code + // state_[7..] == message + const char* state_; + + enum Code { + kOk = 0, + kNotFound = 1, + kCorruption = 2, + kNotSupported = 3, + kInvalidArgument = 4, + kIOError = 5, + kAlreadyPresent = 6, + kRuntimeError = 7, + kNetworkError = 8, + kIllegalState = 9, + kNotAuthorized = 10, + kAborted = 11, + kRemoteError = 12, + kServiceUnavailable = 13, + kTimedOut = 14, + kUninitialized = 15, + kConfigurationError = 16, + kIncomplete = 17, + kEndOfFile = 18, + // NOTE: Remember to duplicate these constants into wire_protocol.proto and + // and to add StatusTo/FromPB ser/deser cases in wire_protocol.cc ! + // Also remember to make the same changes to the java client in Status.java. + // + // TODO: Move error codes into an error_code.proto or something similar. + }; + COMPILE_ASSERT(sizeof(Code) == 4, code_enum_size_is_part_of_abi); + + Code code() const { + return (state_ == NULL) ? kOk : static_cast<Code>(state_[4]); + } + + Status(Code code, const Slice& msg, const Slice& msg2, int16_t posix_code); + static const char* CopyState(const char* s); +}; + +inline Status::Status(const Status& s) { + state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_); +} + +inline Status& Status::operator=(const Status& s) { + // The following condition catches both aliasing (when this == &s), + // and the common case where both s and *this are OK. + if (state_ != s.state_) { + delete[] state_; + state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_); + } + return *this; +} + +#if __cplusplus >= 201103L +inline Status::Status(Status&& s) : state_(s.state_) { + s.state_ = nullptr; +} + +inline Status& Status::operator=(Status&& s) { + if (state_ != s.state_) { + delete[] state_; + state_ = s.state_; + s.state_ = nullptr; + } + return *this; +} +#endif + +} // namespace kudu + +#endif // KUDU_UTIL_STATUS_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/status_callback.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/status_callback.cc b/be/src/kudu/util/status_callback.cc new file mode 100644 index 0000000..667bfec --- /dev/null +++ b/be/src/kudu/util/status_callback.cc @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/status_callback.h" + +#include "kudu/util/status.h" + +using std::string; + +namespace kudu { + +void DoNothingStatusCB(const Status& status) {} + +void CrashIfNotOkStatusCB(const string& message, const Status& status) { + if (PREDICT_FALSE(!status.ok())) { + LOG(FATAL) << message << ": " << status.ToString(); + } +} + +Status DoNothingStatusClosure() { return Status::OK(); } + +} // end namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/status_callback.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/status_callback.h b/be/src/kudu/util/status_callback.h new file mode 100644 index 0000000..3a36b83 --- /dev/null +++ b/be/src/kudu/util/status_callback.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_STATUS_CALLBACK_H +#define KUDU_UTIL_STATUS_CALLBACK_H + +#include <string> + +#include "kudu/gutil/callback_forward.h" + +namespace kudu { + +class Status; + +// A callback which takes a Status. This is typically used for functions which +// produce asynchronous results and may fail. +typedef Callback<void(const Status& status)> StatusCallback; + +// To be used when a function signature requires a StatusCallback but none +// is needed. +extern void DoNothingStatusCB(const Status& status); + +// A callback that crashes with a FATAL log message if the given Status is not OK. +extern void CrashIfNotOkStatusCB(const std::string& message, const Status& status); + +// A closure (callback without arguments) that returns a Status indicating +// whether it was successful or not. +typedef Callback<Status(void)> StatusClosure; + +// To be used when setting a StatusClosure is optional. +extern Status DoNothingStatusClosure(); + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/stopwatch.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/stopwatch.h b/be/src/kudu/util/stopwatch.h new file mode 100644 index 0000000..e86d90c --- /dev/null +++ b/be/src/kudu/util/stopwatch.h @@ -0,0 +1,342 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_STOPWATCH_H +#define KUDU_UTIL_STOPWATCH_H + +#include <glog/logging.h> +#include <sys/resource.h> +#include <sys/time.h> +#include <time.h> +#include <string> +#if defined(__APPLE__) +#include <mach/clock.h> +#include <mach/mach.h> +#endif // defined(__APPLE__) + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/walltime.h" + +namespace kudu { + +// Macro for logging timing of a block. Usage: +// LOG_TIMING_PREFIX_IF(INFO, FLAGS_should_record_time, "Tablet X: ", "doing some task") { +// ... some task which takes some time +// } +// If FLAGS_should_record_time is true, yields a log like: +// I1102 14:35:51.726186 23082 file.cc:167] Tablet X: Time spent doing some task: +// real 3.729s user 3.570s sys 0.150s +// The task will always execute regardless of whether the timing information is +// printed. +#define LOG_TIMING_PREFIX_IF(severity, condition, prefix, description) \ + for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::severity, prefix, description, \ + -1, (condition)); !_l.HasRun(); _l.MarkHasRun()) + +// Conditionally log, no prefix. +#define LOG_TIMING_IF(severity, condition, description) \ + LOG_TIMING_PREFIX_IF(severity, (condition), "", (description)) + +// Always log, including prefix. +#define LOG_TIMING_PREFIX(severity, prefix, description) \ + LOG_TIMING_PREFIX_IF(severity, true, (prefix), (description)) + +// Always log, no prefix. +#define LOG_TIMING(severity, description) \ + LOG_TIMING_IF(severity, true, (description)) + +// Macro to log the time spent in the rest of the block. +#define SCOPED_LOG_TIMING(severity, description) \ + kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \ + google::severity, "", description, -1, true); + +// Scoped version of LOG_SLOW_EXECUTION(). +#define SCOPED_LOG_SLOW_EXECUTION(severity, max_expected_millis, description) \ + kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \ + google::severity, "", description, max_expected_millis, true) + +// Scoped version of LOG_SLOW_EXECUTION() but with a prefix. +#define SCOPED_LOG_SLOW_EXECUTION_PREFIX(severity, max_expected_millis, prefix, description) \ + kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \ + google::severity, prefix, description, max_expected_millis, true) + +// Macro for logging timing of a block. Usage: +// LOG_SLOW_EXECUTION(INFO, 5, "doing some task") { +// ... some task which takes some time +// } +// when slower than 5 milliseconds, yields a log like: +// I1102 14:35:51.726186 23082 file.cc:167] Time spent doing some task: +// real 3.729s user 3.570s sys 0.150s +#define LOG_SLOW_EXECUTION(severity, max_expected_millis, description) \ + for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::severity, "", description, \ + max_expected_millis, true); !_l.HasRun(); _l.MarkHasRun()) + +// Macro for vlogging timing of a block. The execution happens regardless of the vlog_level, +// it's only the logging that's affected. +// Usage: +// VLOG_TIMING(1, "doing some task") { +// ... some task which takes some time +// } +// Yields a log just like LOG_TIMING's. +#define VLOG_TIMING(vlog_level, description) \ + for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::INFO, "", description, \ + -1, VLOG_IS_ON(vlog_level)); !_l.HasRun(); _l.MarkHasRun()) + +// Macro to log the time spent in the rest of the block. +#define SCOPED_VLOG_TIMING(vlog_level, description) \ + kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \ + google::INFO, "", description, -1, VLOG_IS_ON(vlog_level)); + +#define NANOS_PER_SECOND 1000000000.0 +#define NANOS_PER_MILLISECOND 1000000.0 + +class Stopwatch; + +typedef uint64_t nanosecond_type; + +// Structure which contains an elapsed amount of wall/user/sys time. +struct CpuTimes { + nanosecond_type wall; + nanosecond_type user; + nanosecond_type system; + int64_t context_switches; + + void clear() { wall = user = system = context_switches = 0LL; } + + // Return a string formatted similar to the output of the "time" shell command. + std::string ToString() const { + return StringPrintf( + "real %.3fs\tuser %.3fs\tsys %.3fs", + wall_seconds(), user_cpu_seconds(), system_cpu_seconds()); + } + + double wall_millis() const { + return static_cast<double>(wall) / NANOS_PER_MILLISECOND; + } + + double wall_seconds() const { + return static_cast<double>(wall) / NANOS_PER_SECOND; + } + + double user_cpu_seconds() const { + return static_cast<double>(user) / NANOS_PER_SECOND; + } + + double system_cpu_seconds() const { + return static_cast<double>(system) / NANOS_PER_SECOND; + } +}; + +// A Stopwatch is a convenient way of timing a given operation. +// +// Wall clock time is based on a monotonic timer, so can be reliably used for +// determining durations. +// CPU time is based on either current thread's usage or the usage of the whole +// process, depending on the value of 'Mode' passed to the constructor. +// +// The implementation relies on several syscalls, so should not be used for +// hot paths, but is useful for timing anything on the granularity of seconds +// or more. +// +// NOTE: the user time reported by this class is based on Linux scheduler ticks +// and thus has low precision. Use GetThreadCpuTimeMicros() from walltime.h if +// more accurate per-thread CPU usage timing is required. +class Stopwatch { + public: + + enum Mode { + // Collect usage only about the calling thread. + // This may not be supported on older versions of Linux. + THIS_THREAD, + // Collect usage of all threads. + ALL_THREADS + }; + + // Construct a new stopwatch. The stopwatch is initially stopped. + explicit Stopwatch(Mode mode = THIS_THREAD) + : stopped_(true), + mode_(mode) { + times_.clear(); + } + + // Start counting. If the stopwatch is already counting, then resets the + // start point at the current time. + void start() { + stopped_ = false; + GetTimes(×_); + } + + // Stop counting. If the stopwatch is already stopped, has no effect. + void stop() { + if (stopped_) return; + stopped_ = true; + + CpuTimes current; + GetTimes(¤t); + times_.wall = current.wall - times_.wall; + times_.user = current.user - times_.user; + times_.system = current.system - times_.system; + times_.context_switches = current.context_switches - times_.context_switches; + } + + // Return the elapsed amount of time. If the stopwatch is running, then returns + // the amount of time since it was started. If it is stopped, returns the amount + // of time between the most recent start/stop pair. If the stopwatch has never been + // started, the elapsed time is considered to be zero. + CpuTimes elapsed() const { + if (stopped_) return times_; + + CpuTimes current; + GetTimes(¤t); + current.wall -= times_.wall; + current.user -= times_.user; + current.system -= times_.system; + current.context_switches -= times_.context_switches; + return current; + } + + // Resume a stopped stopwatch, such that the elapsed time continues to grow from + // the point where it was last stopped. + // For example: + // Stopwatch s; + // s.start(); + // sleep(1); // elapsed() is now ~1sec + // s.stop(); + // sleep(1); + // s.resume(); + // sleep(1); // elapsed() is now ~2sec + void resume() { + if (!stopped_) return; + + CpuTimes current(times_); + start(); + times_.wall -= current.wall; + times_.user -= current.user; + times_.system -= current.system; + times_.context_switches -= current.context_switches; + } + + bool is_stopped() const { + return stopped_; + } + + private: + void GetTimes(CpuTimes *times) const { + struct rusage usage; + struct timespec wall; + +#if defined(__APPLE__) + if (mode_ == THIS_THREAD) { + //Adapted from http://blog.kuriositaet.de/?p=257. + struct task_basic_info t_info; + mach_msg_type_number_t t_info_count = TASK_BASIC_INFO_COUNT; + CHECK_EQ(KERN_SUCCESS, task_info(mach_task_self(), TASK_THREAD_TIMES_INFO, + (task_info_t)&t_info, &t_info_count)); + usage.ru_utime.tv_sec = t_info.user_time.seconds; + usage.ru_utime.tv_usec = t_info.user_time.microseconds; + usage.ru_stime.tv_sec = t_info.system_time.seconds; + usage.ru_stime.tv_usec = t_info.system_time.microseconds; + } else { + CHECK_EQ(0, getrusage(RUSAGE_SELF, &usage)); + } + + mach_timespec_t ts; + walltime_internal::GetCurrentTime(&ts); + wall.tv_sec = ts.tv_sec; + wall.tv_nsec = ts.tv_nsec; +#else + CHECK_EQ(0, getrusage((mode_ == THIS_THREAD) ? RUSAGE_THREAD : RUSAGE_SELF, &usage)); + CHECK_EQ(0, clock_gettime(CLOCK_MONOTONIC, &wall)); +#endif // defined(__APPLE__) + times->wall = wall.tv_sec * 1000000000L + wall.tv_nsec; + times->user = usage.ru_utime.tv_sec * 1000000000L + usage.ru_utime.tv_usec * 1000L; + times->system = usage.ru_stime.tv_sec * 1000000000L + usage.ru_stime.tv_usec * 1000L; + times->context_switches = usage.ru_nvcsw + usage.ru_nivcsw; + } + + bool stopped_; + + CpuTimes times_; + Mode mode_; +}; + + +namespace sw_internal { + +// Internal class used by the LOG_TIMING macro. +class LogTiming { + public: + LogTiming(const char *file, int line, google::LogSeverity severity, + std::string prefix, std::string description, + int64_t max_expected_millis, bool should_print) + : file_(file), + line_(line), + severity_(severity), + prefix_(std::move(prefix)), + description_(std::move(description)), + max_expected_millis_(max_expected_millis), + should_print_(should_print), + has_run_(false) { + stopwatch_.start(); + } + + ~LogTiming() { + if (should_print_) { + Print(max_expected_millis_); + } + } + + // Allows this object to be used as the loop variable in for-loop macros. + // Call HasRun() in the conditional check in the for-loop. + bool HasRun() { + return has_run_; + } + + // Allows this object to be used as the loop variable in for-loop macros. + // Call MarkHasRun() in the "increment" section of the for-loop. + void MarkHasRun() { + has_run_ = true; + } + + private: + Stopwatch stopwatch_; + const char *file_; + const int line_; + const google::LogSeverity severity_; + const string prefix_; + const std::string description_; + const int64_t max_expected_millis_; + const bool should_print_; + bool has_run_; + + // Print if the number of expected millis exceeds the max. + // Passing a negative number implies "always print". + void Print(int64_t max_expected_millis) { + stopwatch_.stop(); + CpuTimes times = stopwatch_.elapsed(); + if (times.wall_millis() > max_expected_millis) { + google::LogMessage(file_, line_, severity_).stream() + << prefix_ << "Time spent " << description_ << ": " + << times.ToString(); + } + } + +}; + +} // namespace sw_internal +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/string_case-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/string_case-test.cc b/be/src/kudu/util/string_case-test.cc new file mode 100644 index 0000000..ae166f5 --- /dev/null +++ b/be/src/kudu/util/string_case-test.cc @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include "kudu/util/string_case.h" + +using std::string; + +namespace kudu { + +TEST(TestStringCase, TestSnakeToCamel) { + string out; + SnakeToCamelCase("foo_bar", &out); + ASSERT_EQ("FooBar", out); + + + SnakeToCamelCase("foo-bar", &out); + ASSERT_EQ("FooBar", out); + + SnakeToCamelCase("foobar", &out); + ASSERT_EQ("Foobar", out); +} + +TEST(TestStringCase, TestToUpperCase) { + string out; + ToUpperCase(string("foo"), &out); + ASSERT_EQ("FOO", out); + ToUpperCase(string("foo bar-BaZ"), &out); + ASSERT_EQ("FOO BAR-BAZ", out); +} + +TEST(TestStringCase, TestToUpperCaseInPlace) { + string in_out = "foo"; + ToUpperCase(in_out, &in_out); + ASSERT_EQ("FOO", in_out); +} + +TEST(TestStringCase, TestCapitalize) { + string word = "foo"; + Capitalize(&word); + ASSERT_EQ("Foo", word); + + word = "HiBerNATe"; + Capitalize(&word); + ASSERT_EQ("Hibernate", word); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/string_case.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/string_case.cc b/be/src/kudu/util/string_case.cc new file mode 100644 index 0000000..141cdc5 --- /dev/null +++ b/be/src/kudu/util/string_case.cc @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/string_case.h" + +#include <glog/logging.h> +#include <ctype.h> + +namespace kudu { + +using std::string; + +void SnakeToCamelCase(const std::string &snake_case, + std::string *camel_case) { + DCHECK_NE(camel_case, &snake_case) << "Does not support in-place operation"; + camel_case->clear(); + camel_case->reserve(snake_case.size()); + + bool uppercase_next = true; + for (char c : snake_case) { + if ((c == '_') || + (c == '-')) { + uppercase_next = true; + continue; + } + if (uppercase_next) { + camel_case->push_back(toupper(c)); + } else { + camel_case->push_back(c); + } + uppercase_next = false; + } +} + +void ToUpperCase(const std::string &string, + std::string *out) { + if (out != &string) { + *out = string; + } + + for (char& c : *out) { + c = toupper(c); + } +} + +void Capitalize(string *word) { + uint32_t size = word->size(); + if (size == 0) { + return; + } + + (*word)[0] = toupper((*word)[0]); + + for (int i = 1; i < size; i++) { + (*word)[i] = tolower((*word)[i]); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/string_case.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/string_case.h b/be/src/kudu/util/string_case.h new file mode 100644 index 0000000..98f5828 --- /dev/null +++ b/be/src/kudu/util/string_case.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Utility methods for dealing with string case. +#ifndef KUDU_UTIL_STRING_CASE_H +#define KUDU_UTIL_STRING_CASE_H + +#include <string> + +namespace kudu { + +// Convert the given snake_case string to camel case. +// Also treats '-' in a string like a '_' +// For example: +// - 'foo_bar' -> FooBar +// - 'foo-bar' -> FooBar +// +// This function cannot operate in-place -- i.e. 'camel_case' must not +// point to 'snake_case'. +void SnakeToCamelCase(const std::string &snake_case, + std::string *camel_case); + +// Upper-case all of the characters in the given string. +// 'string' and 'out' may refer to the same string to replace in-place. +void ToUpperCase(const std::string &string, + std::string *out); + +// Capitalizes a string containing a word in place. +// For example: +// - 'hiBerNATe' -> 'Hibernate' +void Capitalize(std::string *word); + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/striped64-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/striped64-test.cc b/be/src/kudu/util/striped64-test.cc new file mode 100644 index 0000000..fee07ca --- /dev/null +++ b/be/src/kudu/util/striped64-test.cc @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <memory> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/monotime.h" +#include "kudu/util/striped64.h" +#include "kudu/util/test_util.h" +#include "kudu/util/thread.h" + +// These flags are used by the multi-threaded tests, can be used for microbenchmarking. +DEFINE_int32(num_operations, 10*1000, "Number of operations to perform"); +DEFINE_int32(num_threads, 2, "Number of worker threads"); + +namespace kudu { + +// Test some basic operations +TEST(Striped64Test, TestBasic) { + LongAdder adder; + ASSERT_EQ(adder.Value(), 0); + adder.IncrementBy(100); + ASSERT_EQ(adder.Value(), 100); + adder.Increment(); + ASSERT_EQ(adder.Value(), 101); + adder.Decrement(); + ASSERT_EQ(adder.Value(), 100); + adder.IncrementBy(-200); + ASSERT_EQ(adder.Value(), -100); + adder.Reset(); + ASSERT_EQ(adder.Value(), 0); +} + +template <class Adder> +class MultiThreadTest { + public: + typedef std::vector<scoped_refptr<Thread> > thread_vec_t; + + MultiThreadTest(int64_t num_operations, int64_t num_threads) + : num_operations_(num_operations), + num_threads_(num_threads) { + } + + void IncrementerThread(const int64_t num) { + for (int i = 0; i < num; i++) { + adder_.Increment(); + } + } + + void DecrementerThread(const int64_t num) { + for (int i = 0; i < num; i++) { + adder_.Decrement(); + } + } + + void Run() { + // Increment + for (int i = 0; i < num_threads_; i++) { + scoped_refptr<Thread> ref; + Thread::Create("Striped64", "Incrementer", &MultiThreadTest::IncrementerThread, this, + num_operations_, &ref); + threads_.push_back(ref); + } + for (const scoped_refptr<Thread> &t : threads_) { + t->Join(); + } + ASSERT_EQ(num_threads_*num_operations_, adder_.Value()); + threads_.clear(); + + // Decrement back to zero + for (int i = 0; i < num_threads_; i++) { + scoped_refptr<Thread> ref; + Thread::Create("Striped64", "Decrementer", &MultiThreadTest::DecrementerThread, this, + num_operations_, &ref); + threads_.push_back(ref); + } + for (const scoped_refptr<Thread> &t : threads_) { + t->Join(); + } + ASSERT_EQ(0, adder_.Value()); + } + + Adder adder_; + + int64_t num_operations_; + // This is rounded down to the nearest even number + int32_t num_threads_; + thread_vec_t threads_; +}; + +// Test adder implemented by a single AtomicInt for comparison +class BasicAdder { + public: + BasicAdder() : value_(0) {} + void IncrementBy(int64_t x) { value_.IncrementBy(x); } + inline void Increment() { IncrementBy(1); } + inline void Decrement() { IncrementBy(-1); } + int64_t Value() { return value_.Load(); } + private: + AtomicInt<int64_t> value_; +}; + +void RunMultiTest(int64_t num_operations, int64_t num_threads) { + MonoTime start = MonoTime::Now(); + MultiThreadTest<BasicAdder> basicTest(num_operations, num_threads); + basicTest.Run(); + MonoTime end1 = MonoTime::Now(); + MultiThreadTest<LongAdder> test(num_operations, num_threads); + test.Run(); + MonoTime end2 = MonoTime::Now(); + MonoDelta basic = end1 - start; + MonoDelta striped = end2 - end1; + LOG(INFO) << "Basic counter took " << basic.ToMilliseconds() << "ms."; + LOG(INFO) << "Striped counter took " << striped.ToMilliseconds() << "ms."; +} + +// Compare a single-thread workload. Demonstrates the overhead of LongAdder over AtomicInt. +TEST(Striped64Test, TestSingleIncrDecr) { + OverrideFlagForSlowTests( + "num_operations", + strings::Substitute("$0", (FLAGS_num_operations * 100))); + RunMultiTest(FLAGS_num_operations, 1); +} + +// Compare a multi-threaded workload. LongAdder should show improvements here. +TEST(Striped64Test, TestMultiIncrDecr) { + OverrideFlagForSlowTests( + "num_operations", + strings::Substitute("$0", (FLAGS_num_operations * 100))); + OverrideFlagForSlowTests( + "num_threads", + strings::Substitute("$0", (FLAGS_num_threads * 4))); + RunMultiTest(FLAGS_num_operations, FLAGS_num_threads); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/striped64.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/striped64.cc b/be/src/kudu/util/striped64.cc new file mode 100644 index 0000000..8343177 --- /dev/null +++ b/be/src/kudu/util/striped64.cc @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/striped64.h" + +#include "kudu/util/monotime.h" +#include "kudu/util/random.h" +#include "kudu/util/threadlocal.h" + +using kudu::striped64::internal::Cell; + +namespace kudu { + +namespace striped64 { +namespace internal { + +// +// Cell +// + +Cell::Cell() + : value_(0) { +} +} // namespace internal +} // namespace striped64 + +// +// Striped64 +// +__thread uint64_t Striped64::tls_hashcode_ = 0; +const uint32_t Striped64::kNumCpus = sysconf(_SC_NPROCESSORS_ONLN); + +Striped64::Striped64() + : busy_(false), + cell_buffer_(nullptr), + cells_(nullptr), + num_cells_(0) { +} + +uint64_t Striped64::get_tls_hashcode() { + if (PREDICT_FALSE(tls_hashcode_ == 0)) { + Random r((MonoTime::Now() - MonoTime::Min()).ToNanoseconds()); + const uint64_t hash = r.Next64(); + // Avoid zero to allow xorShift rehash, and because 0 indicates an unset + // hashcode above. + tls_hashcode_ = (hash == 0) ? 1 : hash; + } + return tls_hashcode_; +} + + +Striped64::~Striped64() { + // Cell is a POD, so no need to destruct each one. + free(cell_buffer_); +} + +void Striped64::RetryUpdate(int64_t x, Rehash to_rehash) { + uint64_t h = get_tls_hashcode(); + // There are three operations in this loop. + // + // 1. Try to add to the Cell hash table entry for the thread if the table exists. + // When there's contention, rehash to try a different Cell. + // 2. Try to initialize the hash table. + // 3. Try to update the base counter. + // + // These are predicated on successful CAS operations, which is why it's all wrapped in an + // infinite retry loop. + while (true) { + int32_t n = base::subtle::Acquire_Load(&num_cells_); + if (n > 0) { + if (to_rehash == kRehash) { + // CAS failed already, rehash before trying to increment. + to_rehash = kNoRehash; + } else { + Cell *cell = &(cells_[(n - 1) & h]); + int64_t v = cell->value_.Load(); + if (cell->CompareAndSet(v, Fn(v, x))) { + // Successfully CAS'd the corresponding cell, done. + break; + } + } + // Rehash since we failed to CAS, either previously or just now. + h ^= h << 13; + h ^= h >> 17; + h ^= h << 5; + } else if (n == 0 && CasBusy()) { + // We think table hasn't been initialized yet, try to do so. + // Recheck preconditions, someone else might have init'd in the meantime. + n = base::subtle::Acquire_Load(&num_cells_); + if (n == 0) { + n = 1; + // Calculate the size. Nearest power of two >= NCPU. + // Also handle a negative NCPU, can happen if sysconf name is unknown + while (kNumCpus > n) { + n <<= 1; + } + // Allocate cache-aligned memory for use by the cells_ table. + int err = posix_memalign(&cell_buffer_, CACHELINE_SIZE, sizeof(Cell)*n); + CHECK_EQ(0, err) << "error calling posix_memalign" << std::endl; + // Initialize the table + cells_ = new (cell_buffer_) Cell[n]; + base::subtle::Release_Store(&num_cells_, n); + } + // End critical section + busy_.Store(0); + } else { + // Fallback to adding to the base value. + // Means the table wasn't initialized or we failed to init it. + int64_t v = base_.value_.Load(); + if (CasBase(v, Fn(v, x))) { + break; + } + } + } + // Record index for next time + tls_hashcode_ = h; +} + +void Striped64::InternalReset(int64_t initialValue) { + const int32_t n = base::subtle::Acquire_Load(&num_cells_); + base_.value_.Store(initialValue); + for (int i = 0; i < n; i++) { + cells_[i].value_.Store(initialValue); + } +} + +void LongAdder::IncrementBy(int64_t x) { + // Use hash table if present. If that fails, call RetryUpdate to rehash and retry. + // If no hash table, try to CAS the base counter. If that fails, RetryUpdate to init the table. + const int32_t n = base::subtle::Acquire_Load(&num_cells_); + if (n > 0) { + Cell *cell = &(cells_[(n - 1) & get_tls_hashcode()]); + DCHECK_EQ(0, reinterpret_cast<const uintptr_t>(cell) & (sizeof(Cell) - 1)) + << " unaligned Cell not allowed for Striped64" << std::endl; + const int64_t old = cell->value_.Load(); + if (!cell->CompareAndSet(old, old + x)) { + // When we hit a hash table contention, signal RetryUpdate to rehash. + RetryUpdate(x, kRehash); + } + } else { + int64_t b = base_.value_.Load(); + if (!base_.CompareAndSet(b, b + x)) { + // Attempt to initialize the table. No need to rehash since the contention was for the + // base counter, not the hash table. + RetryUpdate(x, kNoRehash); + } + } +} + +// +// LongAdder +// + +int64_t LongAdder::Value() const { + int64_t sum = base_.value_.Load(); + const int32_t n = base::subtle::Acquire_Load(&num_cells_); + for (int i = 0; i < n; i++) { + sum += cells_[i].value_.Load(); + } + return sum; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/striped64.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/striped64.h b/be/src/kudu/util/striped64.h new file mode 100644 index 0000000..a3b829b --- /dev/null +++ b/be/src/kudu/util/striped64.h @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_UTIL_STRIPED64_H_ +#define KUDU_UTIL_STRIPED64_H_ + +#include "kudu/gutil/port.h" +#include "kudu/util/atomic.h" +#include "kudu/util/threadlocal.h" + +namespace kudu { + +class Striped64; + +namespace striped64 { +namespace internal { + +#define ATOMIC_INT_SIZE sizeof(AtomicInt<int64_t>) +// Padded POD container for AtomicInt. This prevents false sharing of cache lines. +class Cell { + public: + Cell(); + inline bool CompareAndSet(int64_t cmp, int64_t value) { + return value_.CompareAndSet(cmp, value); + } + + // Padding advice from Herb Sutter: + // http://www.drdobbs.com/parallel/eliminate-false-sharing/217500206?pgno=4 + AtomicInt<int64_t> value_; + char pad[CACHELINE_SIZE > ATOMIC_INT_SIZE ? + CACHELINE_SIZE - ATOMIC_INT_SIZE : 1]; + + DISALLOW_COPY_AND_ASSIGN(Cell); +} CACHELINE_ALIGNED; +#undef ATOMIC_INT_SIZE + +} // namespace internal +} // namespace striped64 + +// This set of classes is heavily derived from JSR166e, released into the public domain +// by Doug Lea and the other authors. +// +// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?view=co +// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/LongAdder.java?view=co +// +// The Striped64 and LongAdder implementations here are simplified versions of what's present in +// JSR166e. However, the core ideas remain the same. +// +// Updating a single AtomicInteger in a multi-threaded environment can be quite slow: +// +// 1. False sharing of cache lines with other counters. +// 2. Cache line bouncing from high update rates, especially with many cores. +// +// These two problems are addressed by Striped64. When there is no contention, it uses CAS on a +// single base counter to store updates. However, when Striped64 detects contention +// (via a failed CAS operation), it will allocate a small, fixed size hashtable of Cells. +// A Cell is a simple POD that pads out an AtomicInt to 64 bytes to prevent +// sharing a cache line. +// +// Reading the value of a Striped64 requires traversing the hashtable to calculate the true sum. +// +// Each updating thread uses a thread-local hashcode to determine its Cell in the hashtable. +// If a thread fails to CAS its hashed Cell, it will do a lightweight rehash operation to try +// and find an uncontended bucket. Because the hashcode is thread-local, this rehash affects all +// Striped64's accessed by the thread. This is good, since contention on one Striped64 is +// indicative of contention elsewhere too. +// +// The hashtable is statically sized to the nearest power of 2 greater than or equal to the +// number of CPUs. This is sufficient, since this guarantees the existence of a perfect hash +// function. Due to the random rehashing, the threads should eventually converge to this function. +// In practice, this scheme has shown to be sufficient. +// +// The biggest simplification of this implementation compared to JSR166e is that we do not +// dynamically grow the table, instead immediately allocating it to the full size. +// We also do not lazily allocate each Cell, instead allocating the entire array at once. +// This means we waste some additional memory in low contention scenarios, and initial allocation +// will also be slower. Some of the micro-optimizations were also elided for readability. +class Striped64 { + public: + Striped64(); + virtual ~Striped64(); + + protected: + + enum Rehash { + kRehash, + kNoRehash + }; + + // CAS the base field. + bool CasBase(int64_t cmp, int64_t val) { return base_.CompareAndSet(cmp, val); } + + // CAS the busy field from 0 to 1 to acquire the lock. + bool CasBusy() { return busy_.CompareAndSet(0, 1); } + + // Computes the function of the current and new value. Used in RetryUpdate. + virtual int64_t Fn(int64_t current_value, int64_t new_value) = 0; + + // Handles cases of updates involving initialization, resizing, creating new Cells, and/or + // contention. See above for further explanation. + void RetryUpdate(int64_t x, Rehash to_rehash); + + // Sets base and all cells to the given value. + void InternalReset(int64_t initialValue); + + // Base value, used mainly when there is no contention, but also as a fallback during + // table initialization races. Updated via CAS. + striped64::internal::Cell base_; + + // CAS lock used when resizing and/or creating cells. + AtomicBool busy_; + + // Backing buffer for cells_, used for alignment. + void* cell_buffer_; + + // Table of cells. When non-null, size is the nearest power of 2 >= NCPU. + striped64::internal::Cell* cells_; + int32_t num_cells_; + + protected: + static uint64_t get_tls_hashcode(); + + private: + // Static hash code per-thread. Shared across all instances to limit thread-local pollution. + // Also, if a thread hits a collision on one Striped64, it's also likely to collide on + // other Striped64s too. + static __thread uint64_t tls_hashcode_; + + // Number of CPUs, to place bound on table size. + static const uint32_t kNumCpus; + +}; + +// A 64-bit number optimized for high-volume concurrent updates. +// See Striped64 for a longer explanation of the inner workings. +class LongAdder : Striped64 { + public: + LongAdder() {} + void IncrementBy(int64_t x); + void Increment() { IncrementBy(1); } + void Decrement() { IncrementBy(-1); } + + // Returns the current value. + // Note this is not an atomic snapshot in the presence of concurrent updates. + int64_t Value() const; + + // Resets the counter state to zero. + void Reset() { InternalReset(0); } + + private: + int64_t Fn(int64_t current_value, int64_t new_value) override { + return current_value + new_value; + } + + DISALLOW_COPY_AND_ASSIGN(LongAdder); +}; + +} // namespace kudu + +#endif
