http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cow_object.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cow_object.h b/be/src/kudu/util/cow_object.h new file mode 100644 index 0000000..159a8bb --- /dev/null +++ b/be/src/kudu/util/cow_object.h @@ -0,0 +1,437 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <algorithm> // IWYU pragma: keep +#include <map> +#include <memory> +#include <ostream> + +#include <glog/logging.h> + +#include "kudu/gutil/macros.h" +#include "kudu/util/rwc_lock.h" + +namespace kudu { + +// An object which manages its state via copy-on-write. +// +// Access to this object can be done more conveniently using the +// CowLock template class defined below. +// +// The 'State' template parameter must be swappable using std::swap. +template<class State> +class CowObject { + public: + CowObject() {} + ~CowObject() {} + + // Lock an object for read. + // + // While locked, a mutator will be blocked when trying to commit its mutation. + void ReadLock() const { + lock_.ReadLock(); + } + + // Return whether the object is locked for read. + bool IsReadLocked() const { + return lock_.HasReaders(); + } + + // Unlock an object previously locked for read, unblocking a mutator + // actively trying to commit its mutation. + void ReadUnlock() const { + lock_.ReadUnlock(); + } + + // Lock the object for write (preventing concurrent mutators). + // + // We defer making a dirty copy of the state to mutable_dirty() so that the + // copy can be avoided if no dirty changes are actually made. + void StartMutation() { + lock_.WriteLock(); + } + + // Return whether the object is locked for read and write. + bool IsWriteLocked() const { + return lock_.HasWriteLock(); + } + + // Abort the current mutation. This drops the write lock without applying any + // changes made to the mutable copy. + void AbortMutation() { + DCHECK(lock_.HasWriteLock()); + dirty_state_.reset(); + lock_.WriteUnlock(); + } + + // Commit the current mutation. This escalates to the "Commit" lock, which + // blocks any concurrent readers or writers, swaps in the new version of the + // State, and then drops the commit lock. + void CommitMutation() { + DCHECK(lock_.HasWriteLock()); + if (!dirty_state_) { + AbortMutation(); + return; + } + lock_.UpgradeToCommitLock(); + std::swap(state_, *dirty_state_); + dirty_state_.reset(); + lock_.CommitUnlock(); + } + + // Return the current state, not reflecting any in-progress mutations. + State& state() { + DCHECK(lock_.HasReaders() || lock_.HasWriteLock()); + return state_; + } + + const State& state() const { + DCHECK(lock_.HasReaders() || lock_.HasWriteLock()); + return state_; + } + + // Returns the current dirty state (i.e reflecting in-progress mutations). + // Should only be called by a thread who previously called StartMutation(). + State* mutable_dirty() { + DCHECK(lock_.HasWriteLock()); + if (!dirty_state_) { + dirty_state_.reset(new State(state_)); + } + return dirty_state_.get(); + } + + const State& dirty() const { + DCHECK(lock_.HasWriteLock()); + if (!dirty_state_) { + return state_; + } + return *dirty_state_.get(); + } + + private: + mutable RWCLock lock_; + + State state_; + std::unique_ptr<State> dirty_state_; + + DISALLOW_COPY_AND_ASSIGN(CowObject); +}; + +// Lock state for the following lock-guard-like classes. +enum class LockMode { + // The lock is held for reading. + READ, + + // The lock is held for reading and writing. + WRITE, + + // The lock is not held. + RELEASED +}; + +// Defined so LockMode is compatible with DCHECK and the like. +std::ostream& operator<<(std::ostream& o, LockMode m); + +// A lock-guard-like scoped object to acquire the lock on a CowObject, +// and obtain a pointer to the correct copy to read/write. +// +// Example usage: +// +// CowObject<Foo> my_obj; +// { +// CowLock<Foo> l(&my_obj, LockMode::READ); +// l.data().get_foo(); +// ... +// } +// { +// CowLock<Foo> l(&my_obj, LockMode::WRITE); +// l->mutable_data()->set_foo(...); +// ... +// l.Commit(); +// } +template<class State> +class CowLock { + public: + + // An unlocked CowLock. This is useful for default constructing a lock to be + // moved in to. + CowLock() + : cow_(nullptr), + mode_(LockMode::RELEASED) { + } + + // Lock in either read or write mode. + CowLock(CowObject<State>* cow, + LockMode mode) + : cow_(cow), + mode_(mode) { + switch (mode) { + case LockMode::READ: cow_->ReadLock(); break; + case LockMode::WRITE: cow_->StartMutation(); break; + default: LOG(FATAL) << "Cannot lock in mode " << mode; + } + } + + // Lock in read mode. + // A const object may not be locked in write mode. + CowLock(const CowObject<State>* info, + LockMode mode) + : cow_(const_cast<CowObject<State>*>(info)), + mode_(mode) { + switch (mode) { + case LockMode::READ: cow_->ReadLock(); break; + case LockMode::WRITE: LOG(FATAL) << "Cannot write-lock a const pointer"; + default: LOG(FATAL) << "Cannot lock in mode " << mode; + } + } + + // Disable copying. + CowLock(const CowLock&) = delete; + CowLock& operator=(const CowLock&) = delete; + + // Allow moving. + CowLock(CowLock&& other) noexcept + : cow_(other.cow_), + mode_(other.mode_) { + other.cow_ = nullptr; + other.mode_ = LockMode::RELEASED; + } + CowLock& operator=(CowLock&& other) noexcept { + cow_ = other.cow_; + mode_ = other.mode_; + other.cow_ = nullptr; + other.mode_ = LockMode::RELEASED; + return *this; + } + + // Commit the underlying object. + // Requires that the caller hold the lock in write mode. + void Commit() { + DCHECK_EQ(LockMode::WRITE, mode_); + cow_->CommitMutation(); + mode_ = LockMode::RELEASED; + } + + void Unlock() { + switch (mode_) { + case LockMode::READ: cow_->ReadUnlock(); break; + case LockMode::WRITE: cow_->AbortMutation(); break; + default: DCHECK_EQ(LockMode::RELEASED, mode_); break; + } + mode_ = LockMode::RELEASED; + } + + // Obtain the underlying data. In WRITE mode, this returns the + // same data as mutable_data() (not the safe unchanging copy). + const State& data() const { + switch (mode_) { + case LockMode::READ: return cow_->state(); + case LockMode::WRITE: return cow_->dirty(); + default: LOG(FATAL) << "Cannot access data after committing"; + } + } + + // Obtain the mutable data. This may only be called in WRITE mode. + State* mutable_data() { + switch (mode_) { + case LockMode::READ: LOG(FATAL) << "Cannot mutate data with READ lock"; + case LockMode::WRITE: return cow_->mutable_dirty(); + default: LOG(FATAL) << "Cannot access data after committing"; + } + } + + bool is_write_locked() const { + return mode_ == LockMode::WRITE; + } + + // Drop the lock. If the lock is held in WRITE mode, and the + // lock has not yet been released, aborts the mutation, restoring + // the underlying object to its original data. + ~CowLock() { + Unlock(); + } + + private: + CowObject<State>* cow_; + LockMode mode_; +}; + +// Scoped object that locks multiple CowObjects for reading or for writing. +// When locked for writing and mutations are completed, can also commit those +// mutations, which releases the lock. +// +// CowObjects are stored in an std::map, which provides two important properties: +// 1. AddObject() can deduplicate CowObjects already inserted. +// 2. When locking for writing, the deterministic iteration order provided by +// std::map prevents deadlocks. +// +// The use of std::map forces callers to provide a key for each CowObject. For +// a key implementation to be usable, an appropriate overload of operator< +// must be available. +// +// Unlike CowLock, does not mediate access to the CowObject data itself; +// callers should access the data out of band. +// +// Sample usage: +// +// struct Foo { +// string id_; +// string data_; +// }; +// +// vector<CowObject<Foo>> foos; +// +// 1. Locking a group of CowObjects for reading: +// +// CowGroupLock<string, Foo> l(LockMode::RELEASED); +// for (const auto& f : foos) { +// l.AddObject(f.id_, f); +// } +// l.Lock(LockMode::READ); +// for (const auto& f : foos) { +// cout << f.state().data_ << endl; +// } +// l.Unlock(); +// +// 2. Tracking already-write-locked CowObjects for group commit: +// +// CowGroupLock<string, Foo> l(LockMode::WRITE); +// for (const auto& f : foos) { +// l.AddObject(f.id_, f); +// f.mutable_dirty().data_ = "modified"; +// } +// l.Commit(); +// +// 3. Aggregating unlocked CowObjects, locking them safely, and committing them together: +// +// CowGroupLock<string, Foo> l(LockMode::RELEASED); +// for (const auto& f : foos) { +// l.AddObject(f.id_, f); +// } +// l.Lock(LockMode::WRITE); +// for (const auto& f : foos) { +// f.mutable_dirty().data_ = "modified"; +// } +// l.Commit(); +template<class Key, class Value> +class CowGroupLock { + public: + explicit CowGroupLock(LockMode mode) + : mode_(mode) { + } + + ~CowGroupLock() { + Unlock(); + } + + void Unlock() { + switch (mode_) { + case LockMode::READ: + for (const auto& e : cows_) { + e.second->ReadUnlock(); + } + break; + case LockMode::WRITE: + for (const auto& e : cows_) { + e.second->AbortMutation(); + } + break; + default: + DCHECK_EQ(LockMode::RELEASED, mode_); + break; + } + + cows_.clear(); + mode_ = LockMode::RELEASED; + } + + void Lock(LockMode new_mode) { + DCHECK_EQ(LockMode::RELEASED, mode_); + + switch (new_mode) { + case LockMode::READ: + for (const auto& e : cows_) { + e.second->ReadLock(); + } + break; + case LockMode::WRITE: + for (const auto& e : cows_) { + e.second->StartMutation(); + } + break; + default: + LOG(FATAL) << "Cannot lock in mode " << new_mode; + } + mode_ = new_mode; + } + + void Commit() { + DCHECK_EQ(LockMode::WRITE, mode_); + for (const auto& e : cows_) { + e.second->CommitMutation(); + } + cows_.clear(); + mode_ = LockMode::RELEASED; + } + + // Adds a new CowObject to be tracked by the lock guard. Does nothing if a + // CowObject with the same key was already added. + // + // It is the responsibility of the caller to ensure: + // 1. That 'object' remains alive until the lock is released. + // 2. That if 'object' was already added, both objects point to the same + // memory address. + // 3. That if the CowGroupLock is already locked in a particular mode, + // 'object' is also already locked in that mode. + void AddObject(Key key, const CowObject<Value>* object) { + AssertObjectLocked(object); + auto r = cows_.emplace(std::move(key), const_cast<CowObject<Value>*>(object)); + DCHECK_EQ(r.first->second, object); + } + + // Like the above, but for mutable objects. + void AddMutableObject(Key key, CowObject<Value>* object) { + AssertObjectLocked(object); + auto r = cows_.emplace(std::move(key), object); + DCHECK_EQ(r.first->second, object); + } + + private: + void AssertObjectLocked(const CowObject<Value>* object) const { +#ifndef NDEBUG + switch (mode_) { + case LockMode::READ: + DCHECK(object->IsReadLocked()); + break; + case LockMode::WRITE: + DCHECK(object->IsWriteLocked()); + break; + default: + DCHECK_EQ(LockMode::RELEASED, mode_); + break; + } +#endif + } + + std::map<Key, CowObject<Value>*> cows_; + LockMode mode_; + + DISALLOW_COPY_AND_ASSIGN(CowGroupLock); +}; + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/crc-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/crc-test.cc b/be/src/kudu/util/crc-test.cc new file mode 100644 index 0000000..cf13268 --- /dev/null +++ b/be/src/kudu/util/crc-test.cc @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <cstring> +#include <ostream> +#include <string> + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/crc.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_util.h" + +namespace kudu { +namespace crc { + +using strings::Substitute; + +class CrcTest : public KuduTest { + protected: + + // Returns pointer to data which must be deleted by caller. + static void GenerateBenchmarkData(const uint8_t** bufptr, size_t* buflen) { + const uint32_t kNumNumbers = 1000000; + const uint32_t kBytesPerNumber = sizeof(uint32_t); + const uint32_t kLength = kNumNumbers * kBytesPerNumber; + auto buf = new uint8_t[kLength]; + for (uint32_t i = 0; i < kNumNumbers; i++) { + memcpy(buf + (i * kBytesPerNumber), &i, kBytesPerNumber); + } + *bufptr = buf; + *buflen = kLength; + } + +}; + +// Basic functionality test. +TEST_F(CrcTest, TestCRC32C) { + const std::string test_data("abcdefgh"); + const uint64_t kExpectedCrc = 0xa9421b7; // Known value from crcutil usage test program. + + Crc* crc32c = GetCrc32cInstance(); + uint64_t data_crc = 0; + crc32c->Compute(test_data.data(), test_data.length(), &data_crc); + char buf[kFastToBufferSize]; + const char* output = FastHex64ToBuffer(data_crc, buf); + LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output << " (full 64 bits)"; + output = FastHex32ToBuffer(static_cast<uint32_t>(data_crc), buf); + LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output << " (truncated 32 bits)"; + ASSERT_EQ(kExpectedCrc, data_crc); + + // Using helper + uint64_t data_crc2 = Crc32c(test_data.data(), test_data.length()); + ASSERT_EQ(kExpectedCrc, data_crc2); + + // Using multiple chunks + size_t half_length = test_data.length() / 2; + uint64_t data_crc3 = Crc32c(test_data.data(), half_length); + data_crc3 = Crc32c(test_data.data() + half_length, half_length, data_crc3); + ASSERT_EQ(kExpectedCrc, data_crc3); +} + +// Simple benchmark of CRC32C throughput. +// We should expect about 8 bytes per cycle in throughput on a single core. +TEST_F(CrcTest, BenchmarkCRC32C) { + gscoped_ptr<const uint8_t[]> data; + const uint8_t* buf; + size_t buflen; + GenerateBenchmarkData(&buf, &buflen); + data.reset(buf); + Crc* crc32c = GetCrc32cInstance(); + int kNumRuns = 1000; + if (AllowSlowTests()) { + kNumRuns = 40000; + } + const uint64_t kNumBytes = kNumRuns * buflen; + Stopwatch sw; + sw.start(); + for (int i = 0; i < kNumRuns; i++) { + uint64_t cksum; + crc32c->Compute(buf, buflen, &cksum); + } + sw.stop(); + CpuTimes elapsed = sw.elapsed(); + LOG(INFO) << Substitute("$0 runs of CRC32C on $1 bytes of data (total: $2 bytes)" + " in $3 seconds; $4 bytes per millisecond, $5 bytes per nanosecond!", + kNumRuns, buflen, kNumBytes, elapsed.wall_seconds(), + (kNumBytes / elapsed.wall_millis()), + (kNumBytes / elapsed.wall)); +} + +} // namespace crc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/crc.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/crc.cc b/be/src/kudu/util/crc.cc new file mode 100644 index 0000000..1534b8d --- /dev/null +++ b/be/src/kudu/util/crc.cc @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "kudu/util/crc.h" + +#include <crcutil/interface.h> + +#include "kudu/gutil/once.h" +#include "kudu/util/debug/leakcheck_disabler.h" + +namespace kudu { +namespace crc { + +using debug::ScopedLeakCheckDisabler; + +static GoogleOnceType crc32c_once = GOOGLE_ONCE_INIT; +static Crc* crc32c_instance = nullptr; + +static void InitCrc32cInstance() { + ScopedLeakCheckDisabler disabler; // CRC instance is never freed. + // TODO: Is initial = 0 and roll window = 4 appropriate for all cases? + crc32c_instance = crcutil_interface::CRC::CreateCrc32c(true, 0, 4, nullptr); +} + +Crc* GetCrc32cInstance() { + GoogleOnceInit(&crc32c_once, &InitCrc32cInstance); + return crc32c_instance; +} + +uint32_t Crc32c(const void* data, size_t length) { + uint64_t crc32 = 0; + GetCrc32cInstance()->Compute(data, length, &crc32); + return static_cast<uint32_t>(crc32); // Only uses lower 32 bits. +} + +uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32) { + uint64_t crc_tmp = static_cast<uint64_t>(prev_crc32); + GetCrc32cInstance()->Compute(data, length, &crc_tmp); + return static_cast<uint32_t>(crc_tmp); // Only uses lower 32 bits. +} + +} // namespace crc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/crc.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/crc.h b/be/src/kudu/util/crc.h new file mode 100644 index 0000000..a5db4ea --- /dev/null +++ b/be/src/kudu/util/crc.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_CRC_H_ +#define KUDU_UTIL_CRC_H_ + +#include <stdint.h> +#include <stdlib.h> + +#include <crcutil/interface.h> + +namespace kudu { +namespace crc { + +typedef crcutil_interface::CRC Crc; + +// Returns pointer to singleton instance of CRC32C implementation. +Crc* GetCrc32cInstance(); + +// Helper function to simply calculate a CRC32C of the given data. +uint32_t Crc32c(const void* data, size_t length); + +// Given CRC value of previous chunk of data, +// extends it to new chunk and returns the result. +uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32); + +} // namespace crc +} // namespace kudu + +#endif // KUDU_UTIL_CRC_H_ http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/curl_util.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/curl_util.cc b/be/src/kudu/util/curl_util.cc new file mode 100644 index 0000000..4eddb64 --- /dev/null +++ b/be/src/kudu/util/curl_util.cc @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/curl_util.h" + +#include <cstddef> +#include <cstdint> +#include <ostream> + +#include <curl/curl.h> +#include <glog/logging.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/security/openssl_util.h" +#include "kudu/util/faststring.h" +#include "kudu/util/scoped_cleanup.h" + +namespace kudu { + +namespace { + +inline Status TranslateError(CURLcode code) { + if (code == CURLE_OK) { + return Status::OK(); + } + return Status::NetworkError("curl error", curl_easy_strerror(code)); +} + +extern "C" { +size_t WriteCallback(void* buffer, size_t size, size_t nmemb, void* user_ptr) { + size_t real_size = size * nmemb; + faststring* buf = reinterpret_cast<faststring*>(user_ptr); + CHECK_NOTNULL(buf)->append(reinterpret_cast<const uint8_t*>(buffer), real_size); + return real_size; +} +} // extern "C" + +} // anonymous namespace + +EasyCurl::EasyCurl() { + // Use our own SSL initialization, and disable curl's. + // Both of these calls are idempotent. + security::InitializeOpenSSL(); + CHECK_EQ(0, curl_global_init(CURL_GLOBAL_DEFAULT & ~CURL_GLOBAL_SSL)); + curl_ = curl_easy_init(); + CHECK(curl_) << "Could not init curl"; +} + +EasyCurl::~EasyCurl() { + curl_easy_cleanup(curl_); +} + +Status EasyCurl::FetchURL(const std::string& url, faststring* dst, + const std::vector<std::string>& headers) { + return DoRequest(url, nullptr, dst, headers); +} + +Status EasyCurl::PostToURL(const std::string& url, + const std::string& post_data, + faststring* dst) { + return DoRequest(url, &post_data, dst); +} + +Status EasyCurl::DoRequest(const std::string& url, + const std::string* post_data, + faststring* dst, + const std::vector<std::string>& headers) { + CHECK_NOTNULL(dst)->clear(); + + if (!verify_peer_) { + RETURN_NOT_OK(TranslateError(curl_easy_setopt( + curl_, CURLOPT_SSL_VERIFYHOST, 0))); + RETURN_NOT_OK(TranslateError(curl_easy_setopt( + curl_, CURLOPT_SSL_VERIFYPEER, 0))); + } + + // Add headers if specified. + struct curl_slist* curl_headers = nullptr; + auto clean_up_curl_slist = MakeScopedCleanup([&]() { + curl_slist_free_all(curl_headers); + }); + + for (const auto& header : headers) { + curl_headers = CHECK_NOTNULL(curl_slist_append(curl_headers, header.c_str())); + } + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, curl_headers))); + + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_URL, url.c_str()))); + if (return_headers_) { + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HEADER, 1))); + } + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, WriteCallback))); + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_WRITEDATA, + static_cast<void *>(dst)))); + if (post_data) { + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, + post_data->c_str()))); + } + + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HTTPAUTH, CURLAUTH_ANY))); + if (timeout_.Initialized()) { + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_NOSIGNAL, 1))); + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_TIMEOUT_MS, + timeout_.ToMilliseconds()))); + } + RETURN_NOT_OK(TranslateError(curl_easy_perform(curl_))); + long rc; // NOLINT(*) curl wants a long + RETURN_NOT_OK(TranslateError(curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &rc))); + if (rc != 200) { + return Status::RemoteError(strings::Substitute("HTTP $0", rc)); + } + + return Status::OK(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/curl_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/curl_util.h b/be/src/kudu/util/curl_util.h new file mode 100644 index 0000000..cccd2db --- /dev/null +++ b/be/src/kudu/util/curl_util.h @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_CURL_UTIL_H +#define KUDU_UTIL_CURL_UTIL_H + +#include <string> +#include <vector> + +#include "kudu/gutil/macros.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" + +typedef void CURL; + +namespace kudu { + +class faststring; + +// Simple wrapper around curl's "easy" interface, allowing the user to +// fetch web pages into memory using a blocking API. +// +// This is not thread-safe. +class EasyCurl { + public: + EasyCurl(); + ~EasyCurl(); + + // Fetch the given URL into the provided buffer. + // Any existing data in the buffer is replaced. + // The optional param 'headers' holds additional headers. + // e.g. {"Accept-Encoding: gzip"} + Status FetchURL(const std::string& url, + faststring* dst, + const std::vector<std::string>& headers = {}); + + // Issue an HTTP POST to the given URL with the given data. + // Returns results in 'dst' as above. + Status PostToURL(const std::string& url, + const std::string& post_data, + faststring* dst); + + // Set whether to verify the server's SSL certificate in the case of an HTTPS + // connection. + void set_verify_peer(bool verify) { + verify_peer_ = verify; + } + + void set_return_headers(bool v) { + return_headers_ = v; + } + + void set_timeout(MonoDelta t) { + timeout_ = t; + } + + private: + // Do a request. If 'post_data' is non-NULL, does a POST. + // Otherwise, does a GET. + Status DoRequest(const std::string& url, + const std::string* post_data, + faststring* dst, + const std::vector<std::string>& headers = {}); + CURL* curl_; + + // Whether to verify the server certificate. + bool verify_peer_ = true; + + // Whether to return the HTTP headers with the response. + bool return_headers_ = false; + + MonoDelta timeout_; + + DISALLOW_COPY_AND_ASSIGN(EasyCurl); +}; + +} // namespace kudu + +#endif /* KUDU_UTIL_CURL_UTIL_H */ http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug-util-test.cc b/be/src/kudu/util/debug-util-test.cc new file mode 100644 index 0000000..25e4ae0 --- /dev/null +++ b/be/src/kudu/util/debug-util-test.cc @@ -0,0 +1,458 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <dlfcn.h> +#ifdef __linux__ +#include <link.h> +#endif +#include <unistd.h> + +#include <algorithm> +#include <csignal> +#include <cstddef> +#include <cstdint> +#include <memory> +#include <ostream> +#include <string> +#include <vector> + +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <glog/stl_logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/array_view.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/debug-util.h" +#include "kudu/util/kernel_stack_watchdog.h" +#include "kudu/util/monotime.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" +#include "kudu/util/thread.h" + +using std::string; +using std::vector; + +DECLARE_int32(test_timeout_after); +DECLARE_int32(stress_cpu_threads); + +namespace kudu { + +class DebugUtilTest : public KuduTest { +}; + +TEST_F(DebugUtilTest, TestStackTrace) { + StackTrace t; + t.Collect(0); + string trace = t.Symbolize(); + ASSERT_STR_CONTAINS(trace, "kudu::DebugUtilTest_TestStackTrace_Test::TestBody"); +} + +// DumpThreadStack is only supported on Linux, since the implementation relies +// on the tgkill syscall which is not portable. +#if defined(__linux__) + +namespace { +void SleeperThread(CountDownLatch* l) { + // We use an infinite loop around WaitFor() instead of a normal Wait() + // so that this test passes in TSAN. Without this, we run into this TSAN + // bug which prevents the sleeping thread from handling signals: + // https://code.google.com/p/thread-sanitizer/issues/detail?id=91 + while (!l->WaitFor(MonoDelta::FromMilliseconds(10))) { + } +} + +void fake_signal_handler(int signum) {} + +bool IsSignalHandlerRegistered(int signum) { + struct sigaction cur_action; + CHECK_EQ(0, sigaction(signum, nullptr, &cur_action)); + return cur_action.sa_handler != SIG_DFL; +} +} // anonymous namespace + +TEST_F(DebugUtilTest, TestStackTraceInvalidTid) { + string s = DumpThreadStack(1); + ASSERT_STR_CONTAINS(s, "unable to deliver signal"); +} + +TEST_F(DebugUtilTest, TestStackTraceSelf) { + string s = DumpThreadStack(Thread::CurrentThreadId()); + ASSERT_STR_CONTAINS(s, "kudu::DebugUtilTest_TestStackTraceSelf_Test::TestBody()"); +} + +TEST_F(DebugUtilTest, TestStackTraceMainThread) { + string s = DumpThreadStack(getpid()); + ASSERT_STR_CONTAINS(s, "kudu::DebugUtilTest_TestStackTraceMainThread_Test::TestBody()"); +} + +TEST_F(DebugUtilTest, TestSignalStackTrace) { + CountDownLatch l(1); + scoped_refptr<Thread> t; + ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t)); + auto cleanup_thr = MakeScopedCleanup([&]() { + // Allow the thread to finish. + l.CountDown(); + t->Join(); + }); + + // We have to loop a little bit because it takes a little while for the thread + // to start up and actually call our function. + ASSERT_EVENTUALLY([&]() { + ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread"); + }); + + // Test that we can change the signal and that the stack traces still work, + // on the new signal. + ASSERT_FALSE(IsSignalHandlerRegistered(SIGHUP)); + ASSERT_OK(SetStackTraceSignal(SIGHUP)); + + // Should now be registered. + ASSERT_TRUE(IsSignalHandlerRegistered(SIGHUP)); + + // SIGUSR2 should be relinquished. + ASSERT_FALSE(IsSignalHandlerRegistered(SIGUSR2)); + + // Stack traces should work using the new handler. + ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread"); + + // Switch back to SIGUSR2 and ensure it changes back. + ASSERT_OK(SetStackTraceSignal(SIGUSR2)); + ASSERT_TRUE(IsSignalHandlerRegistered(SIGUSR2)); + ASSERT_FALSE(IsSignalHandlerRegistered(SIGHUP)); + + // Stack traces should work using the new handler. + ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread"); + + // Register our own signal handler on SIGHUP, and ensure that + // we get a bad Status if we try to use it. + signal(SIGHUP, &fake_signal_handler); + ASSERT_STR_CONTAINS(SetStackTraceSignal(SIGHUP).ToString(), + "unable to install signal handler"); + signal(SIGHUP, SIG_DFL); + + // Stack traces should be disabled + ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "unable to take thread stack"); + + // Re-enable so that other tests pass. + ASSERT_OK(SetStackTraceSignal(SIGUSR2)); +} + +// Test which dumps all known threads within this process. +// We don't validate the results in any way -- but this verifies that we can +// dump library threads such as the libc timer_thread and properly time out. +TEST_F(DebugUtilTest, TestSnapshot) { + // HACK: prior tests in this suite start threads. Even though they Join on the + // threads before the test case finishes, there is actually a very short + // period of time after Join() returns but before the actual thread has exited + // and removed itself from /proc/self/task/. That means that 'ListThreads' below + // can sometimes show these threads from prior test cases, and then the assertions + // in this test case would fail. + // + // So, we have to wait here for the number of running threads to level off to the + // expected value. + // Ensure Kernel Stack Watchdog is running. + KernelStackWatchdog::GetInstance(); + int initial_thread_count = + 1 // main thread + + 1 // KernelStackWatchdog + + (FLAGS_test_timeout_after > 0 ? 1 : 0) // test timeout thread if running + + FLAGS_stress_cpu_threads; +#ifdef THREAD_SANITIZER + initial_thread_count++; // tsan signal thread +#endif + // The test and runtime environment runs various utility threads (for example, + // the kernel stack watchdog, the TSAN runtime thread, the test timeout thread, etc). + // Count them before we start any additional threads for this test. + ASSERT_EVENTUALLY([&]{ + vector<pid_t> threads; + ASSERT_OK(ListThreads(&threads)); + ASSERT_EQ(initial_thread_count, threads.size()) << threads; + }); + + // Start a bunch of sleeping threads. + const int kNumThreads = 30; + CountDownLatch l(1); + vector<scoped_refptr<Thread>> threads(kNumThreads); + for (int i = 0; i < kNumThreads; i++) { + ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &threads[i])); + } + + SCOPED_CLEANUP({ + // Allow the thread to finish. + l.CountDown(); + for (auto& t : threads) { + t->Join(); + } + }); + + StackTraceSnapshot snap; + ASSERT_OK(snap.SnapshotAllStacks()); + int count = 0; + int groups = 0; + snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> group) { + groups++; + for (const auto& info : group) { + count++; + LOG(INFO) << info.tid << " " << info.thread_name + << " (" << info.status.ToString() << ")"; + } + LOG(INFO) << group[0].stack.ToHexString(); + }); + int tsan_threads = 0; +#ifdef THREAD_SANITIZER + // TSAN starts an extra thread of its own. + tsan_threads++; +#endif + ASSERT_EQ(kNumThreads + initial_thread_count, count); + // The threads might not have exactly identical stacks, but + // we should have far fewer groups than the total number + // of threads. + ASSERT_LE(groups, kNumThreads / 2); + ASSERT_EQ(tsan_threads, snap.num_failed()); +} + +TEST_F(DebugUtilTest, Benchmark) { + CountDownLatch l(1); + scoped_refptr<Thread> t; + ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t)); + SCOPED_CLEANUP({ + // Allow the thread to finish. + l.CountDown(); + t->Join(); + }); + + for (bool symbolize : {false, true}) { + MonoTime end_time = MonoTime::Now() + MonoDelta::FromSeconds(1); + int count = 0; + volatile int prevent_optimize = 0; + while (MonoTime::Now() < end_time) { + StackTrace trace; + GetThreadStack(t->tid(), &trace); + if (symbolize) { + prevent_optimize += trace.Symbolize().size(); + } + count++; + } + LOG(INFO) << "Throughput: " << count << " dumps/second (symbolize=" << symbolize << ")"; + } +} + +int TakeStackTrace(struct dl_phdr_info* /*info*/, size_t /*size*/, void* data) { + StackTrace* s = reinterpret_cast<StackTrace*>(data); + s->Collect(0); + return 0; +} + +// Test that if we try to collect a stack trace while inside a libdl function +// call that we properly return the bogus stack indicating the issue. +// +// This doesn't work in ThreadSanitizer since we don't intercept dl_iterate_phdr +// in those builds (see note in unwind_safeness.cc). +#ifndef THREAD_SANITIZER +TEST_F(DebugUtilTest, TestUnwindWhileUnsafe) { + StackTrace s; + dl_iterate_phdr(&TakeStackTrace, &s); + ASSERT_STR_CONTAINS(s.Symbolize(), "CouldNotCollectStackTraceBecauseInsideLibDl"); +} +#endif + +int DoNothingDlCallback(struct dl_phdr_info* /*info*/, size_t /*size*/, void* /*data*/) { + return 0; +} + +// Parameterized test which performs various operations which might be dangerous to +// collect a stack trace while the main thread tries to take stack traces. These +// operations are all possibly executed on normal application threads, so we need to +// ensure that if we happen to gather the stack from a thread in the middle of the +// function that we don't crash or deadlock. +// +// Example self-deadlock if we didn't have the appropriate workarounds in place: +// #0 __lll_lock_wait () +// #1 0x00007ffff6f16e42 in __GI___pthread_mutex_lock +// #2 0x00007ffff6c8601f in __GI___dl_iterate_phdr +// #3 0x0000000000695b02 in dl_iterate_phdr +// #4 0x000000000056d013 in _ULx86_64_dwarf_find_proc_info +// #5 0x000000000056d1d5 in fetch_proc_info (c=c@ent +// #6 0x000000000056e2e7 in _ULx86_64_dwarf_find_save_ +// #7 0x000000000056c1b9 in _ULx86_64_dwarf_step (c=c@ +// #8 0x000000000056be21 in _ULx86_64_step +// #9 0x0000000000566b1d in google::GetStackTrace +// #10 0x00000000004dc4d1 in kudu::StackTrace::Collect +// #11 kudu::(anonymous namespace)::HandleStackTraceSignal +// #12 <signal handler called> +// #13 0x00007ffff6f16e31 in __GI___pthread_mutex_lock +// #14 0x00007ffff6c8601f in __GI___dl_iterate_phdr +// #15 0x0000000000695b02 in dl_iterate_phdr +enum DangerousOp { + DLOPEN_AND_CLOSE, + DL_ITERATE_PHDR, + GET_STACK_TRACE, + MALLOC_AND_FREE +}; +class RaceTest : public DebugUtilTest, public ::testing::WithParamInterface<DangerousOp> {}; +INSTANTIATE_TEST_CASE_P(DifferentRaces, RaceTest, + ::testing::Values(DLOPEN_AND_CLOSE, + DL_ITERATE_PHDR, + GET_STACK_TRACE, + MALLOC_AND_FREE)); + +void DangerousOperationThread(DangerousOp op, CountDownLatch* l) { + while (l->count()) { + switch (op) { + case DLOPEN_AND_CLOSE: { + // Check races against dlopen/dlclose. + void* v = dlopen("libc.so.6", RTLD_LAZY); + CHECK(v); + dlclose(v); + break; + } + + case DL_ITERATE_PHDR: { + // Check for races against dl_iterate_phdr. + dl_iterate_phdr(&DoNothingDlCallback, nullptr); + break; + } + + case GET_STACK_TRACE: { + // Check for reentrancy issues + GetStackTrace(); + break; + } + + case MALLOC_AND_FREE: { + // Check large allocations in tcmalloc. + volatile char* x = new char[1024 * 1024 * 2]; + delete[] x; + break; + } + default: + LOG(FATAL) << "unknown op"; + } + } +} + +// Starts a thread performing dangerous operations and then gathers +// its stack trace in a loop trying to trigger races. +TEST_P(RaceTest, TestStackTraceRaces) { + DangerousOp op = GetParam(); + CountDownLatch l(1); + scoped_refptr<Thread> t; + ASSERT_OK(Thread::Create("test", "test thread", &DangerousOperationThread, op, &l, &t)); + SCOPED_CLEANUP({ + // Allow the thread to finish. + l.CountDown(); + // Crash if we can't join the thread after a reasonable amount of time. + // That probably indicates a deadlock. + CHECK_OK(ThreadJoiner(t.get()).give_up_after_ms(10000).Join()); + }); + MonoTime end_time = MonoTime::Now() + MonoDelta::FromSeconds(1); + while (MonoTime::Now() < end_time) { + StackTrace trace; + GetThreadStack(t->tid(), &trace); + } +} + +void BlockSignalsThread() { + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGUSR2); + for (int i = 0; i < 3; i++) { + CHECK_ERR(pthread_sigmask((i % 2) ? SIG_UNBLOCK : SIG_BLOCK, &set, nullptr)); + SleepFor(MonoDelta::FromSeconds(1)); + } +} + +TEST_F(DebugUtilTest, TestThreadBlockingSignals) { + scoped_refptr<Thread> t; + ASSERT_OK(Thread::Create("test", "test thread", &BlockSignalsThread, &t)); + SCOPED_CLEANUP({ t->Join(); }); + string ret; + while (ret.find("unable to deliver signal") == string::npos) { + ret = DumpThreadStack(t->tid()); + LOG(INFO) << ret; + } +} + +// Test stack traces which time out despite the destination thread not blocking +// signals. +TEST_F(DebugUtilTest, TestTimeouts) { + const int kRunTimeSecs = AllowSlowTests() ? 5 : 1; + + CountDownLatch l(1); + scoped_refptr<Thread> t; + ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t)); + auto cleanup_thr = MakeScopedCleanup([&]() { + // Allow the thread to finish. + l.CountDown(); + t->Join(); + }); + + // First, time a few stack traces to determine how long a non-timed-out stack + // trace takes. + vector<MicrosecondsInt64> durations; + for (int i = 0; i < 20; i++) { + StackTrace stack; + auto st = GetMonoTimeMicros(); + ASSERT_OK(GetThreadStack(t->tid(), &stack)); + auto dur = GetMonoTimeMicros() - st; + durations.push_back(dur); + } + + // Compute the median to throw out outliers. + std::sort(durations.begin(), durations.end()); + auto median_duration = durations[durations.size() / 2]; + LOG(INFO) << "Median duration: " << median_duration << "us"; + + // Now take a bunch of stack traces with timeouts clustered around + // the expected time. When we time out, we adjust the timeout to be + // higher so the next attempt is less likely to time out. Conversely, + // when we succeed, we adjust the timeout to be shorter so the next + // attempt is more likely to time out. This has the effect of triggering + // all the interesting cases: (a) success, (b) timeout, (c) timeout + // exactly as the signal finishes. + int num_timeouts = 0; + int num_successes = 0; + auto end_time = MonoTime::Now() + MonoDelta::FromSeconds(kRunTimeSecs); + int64_t timeout_us = median_duration; + while (MonoTime::Now() < end_time) { + StackTraceCollector stc; + // Allocate Stack on the heap so that if we get a use-after-free it + // will be caught more easily by ASAN. + std::unique_ptr<StackTrace> stack(new StackTrace()); + ASSERT_OK(stc.TriggerAsync(t->tid(), stack.get())); + Status s = stc.AwaitCollection(MonoTime::Now() + MonoDelta::FromMicroseconds(timeout_us)); + if (s.ok()) { + num_successes++; + timeout_us--; + } else if (s.IsTimedOut()) { + num_timeouts++; + timeout_us++; + } else { + FAIL() << "Unexpected status: " << s.ToString(); + } + } + LOG(INFO) << "Timed out " << num_timeouts << " times"; + LOG(INFO) << "Succeeded " << num_successes << " times"; +} + +#endif +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug-util.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug-util.cc b/be/src/kudu/util/debug-util.cc new file mode 100644 index 0000000..03556d6 --- /dev/null +++ b/be/src/kudu/util/debug-util.cc @@ -0,0 +1,800 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/debug-util.h" + +#include <dirent.h> +#ifndef __linux__ +#include <sched.h> +#endif +#ifdef __linux__ +#include <syscall.h> +#else +#include <sys/syscall.h> +#endif +#include <unistd.h> + +#include <algorithm> +#include <atomic> +#include <cerrno> +#include <climits> +#include <csignal> +#include <ctime> +#include <iterator> +#include <memory> +#include <ostream> +#include <string> + +#include <glog/logging.h> +#include <glog/raw_logging.h> +#ifdef __linux__ +#define UNW_LOCAL_ONLY +#include <libunwind.h> +#endif + +#include "kudu/gutil/basictypes.h" +#include "kudu/gutil/dynamic_annotations.h" +#include "kudu/gutil/hash/city.h" +#include "kudu/gutil/linux_syscall_support.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/once.h" +#include "kudu/gutil/spinlock.h" +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/strip.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/array_view.h" +#include "kudu/util/debug/leak_annotations.h" +#ifndef __linux__ +#include "kudu/util/debug/sanitizer_scopes.h" +#endif +#include "kudu/util/debug/unwind_safeness.h" +#include "kudu/util/env.h" +#include "kudu/util/errno.h" +#include "kudu/util/faststring.h" +#include "kudu/util/monotime.h" +#include "kudu/util/os-util.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/thread.h" + +using std::string; +using std::unique_ptr; +using std::vector; + +#if defined(__APPLE__) +typedef sig_t sighandler_t; +#endif + +// In coverage builds, this symbol will be defined and allows us to flush coverage info +// to disk before exiting. +#if defined(__APPLE__) + // OS X does not support weak linking at compile time properly. + #if defined(COVERAGE_BUILD) +extern "C" void __gcov_flush() __attribute__((weak_import)); + #else +extern "C" void (*__gcov_flush)() = nullptr; + #endif +#else +extern "C" { +__attribute__((weak)) +void __gcov_flush(); +} +#endif + +// Evil hack to grab a few useful functions from glog +namespace google { + +extern int GetStackTrace(void** result, int max_depth, int skip_count); + +// Symbolizes a program counter. On success, returns true and write the +// symbol name to "out". The symbol name is demangled if possible +// (supports symbols generated by GCC 3.x or newer). Otherwise, +// returns false. +bool Symbolize(void *pc, char *out, int out_size); + +namespace glog_internal_namespace_ { +extern void DumpStackTraceToString(string *s); +} // namespace glog_internal_namespace_ +} // namespace google + +// The %p field width for printf() functions is two characters per byte. +// For some environments, add two extra bytes for the leading "0x". +static const int kPrintfPointerFieldWidth = 2 + 2 * sizeof(void*); + +// The signal that we'll use to communicate with our other threads. +// This can't be in used by other libraries in the process. +static int g_stack_trace_signum = SIGUSR2; + +// Protects g_stack_trace_signum and the installation of the signal +// handler. +static base::SpinLock g_signal_handler_lock(base::LINKER_INITIALIZED); + +namespace kudu { + +bool IsCoverageBuild() { + return __gcov_flush != nullptr; +} + +void TryFlushCoverage() { + static base::SpinLock lock(base::LINKER_INITIALIZED); + + // Flushing coverage is not reentrant or thread-safe. + if (!__gcov_flush || !lock.TryLock()) { + return; + } + + __gcov_flush(); + + lock.Unlock(); +} + + +namespace stack_trace_internal { + +// Simple notification mechanism based on futex. +// +// We use this instead of a mutex and condvar because we need +// to signal it from a signal handler, and mutexes are not async-safe. +// +// pthread semaphores are async-signal-safe but their timedwait function +// only supports wall clock waiting, which is a bit dangerous since we +// need strict timeouts here. +class CompletionFlag { + public: + + // Mark the flag as complete, waking all waiters. + void Signal() { + complete_ = true; +#ifndef __APPLE__ + sys_futex(reinterpret_cast<int32_t*>(&complete_), + FUTEX_WAKE | FUTEX_PRIVATE_FLAG, + INT_MAX, // wake all + 0 /* ignored */); +#endif + } + + // Wait for the flag to be marked as complete, up until the given deadline. + // Returns true if the flag was marked complete before the deadline. + bool WaitUntil(MonoTime deadline) { + if (complete_) return true; + + MonoTime now = MonoTime::Now(); + while (now < deadline) { +#ifndef __APPLE__ + MonoDelta rem = deadline - now; + struct timespec ts; + rem.ToTimeSpec(&ts); + sys_futex(reinterpret_cast<int32_t*>(&complete_), + FUTEX_WAIT | FUTEX_PRIVATE_FLAG, + 0, // wait if value is still 0 + reinterpret_cast<struct kernel_timespec *>(&ts)); +#else + sched_yield(); +#endif + if (complete_) { + return true; + } + now = MonoTime::Now(); + } + return complete_; + } + + void Reset() { + complete_ = false; + } + + bool complete() const { + return complete_; + } + private: + std::atomic<int32_t> complete_ { 0 }; +}; + + +// A pointer to this structure is passed as signal data to a thread when +// a stack trace is being remotely requested. +// +// The state machine is as follows (each state is a tuple of 'queued_to_tid' +// and 'result_ready' status): +// +// [ kNotInUse, false ] +// | +// | (A) +// v (D) +// [ <target tid>, false ] ---> [ kNotInUse, false ] (leaked) +// | +// | (B) +// v (E) +// [ kDumpStarted, false ] ---> [ kNotInUse, false ] (tracer waits for 'result_ready') +// | | +// | (C) | (G) +// v (F) v +// [ kDumpStarted, true ] ---> [ kNotInUse, true ] (already complete) +// +// Transitions: +// (A): tracer thread sets target_tid before sending a singla +// (B): target thread CAS target_tid to kDumpStarted (and aborts on CAS failure) +// (C,G): target thread finishes collecting stacks and signals 'result_ready' +// (D,E,F): tracer thread exchanges 'kNotInUse' back into queued_to_tid in +// RevokeSigData(). +struct SignalData { + // The actual destination for the stack trace collected from the target thread. + StackTrace* stack; + + static const int kNotInUse = 0; + static const int kDumpStarted = -1; + // Either one of the above constants, or if the dumper thread + // is waiting on a response, the tid that it is waiting on. + std::atomic<int64_t> queued_to_tid { kNotInUse }; + + // Signaled when the target thread has successfully collected its stack. + // The dumper thread waits for this to become true. + CompletionFlag result_ready; +}; + +} // namespace stack_trace_internal + +using stack_trace_internal::SignalData; + +namespace { + +// Signal handler for our stack trace signal. +// We expect that the signal is only sent from DumpThreadStack() -- not by a user. +void HandleStackTraceSignal(int /*signum*/, siginfo_t* info, void* /*ucontext*/) { + // Signal handlers may be invoked at any point, so it's important to preserve + // errno. + int save_errno = errno; + SCOPED_CLEANUP({ + errno = save_errno; + }); + auto* sig_data = reinterpret_cast<SignalData*>(info->si_value.sival_ptr); + DCHECK(sig_data); + if (!sig_data) { + // Maybe the signal was sent by a user instead of by ourself, ignore it. + return; + } + ANNOTATE_HAPPENS_AFTER(sig_data); + int64_t my_tid = Thread::CurrentThreadId(); + + // If we were slow to process the signal, the sender may have given up and + // no longer wants our stack trace. In that case, the 'sig' object will + // no longer contain our thread. + if (!sig_data->queued_to_tid.compare_exchange_strong(my_tid, SignalData::kDumpStarted)) { + return; + } + // Marking it as kDumpStarted ensures that the caller thread must now wait + // for our response, since we are writing directly into their StackTrace object. + sig_data->stack->Collect(/*skip_frames=*/1); + sig_data->result_ready.Signal(); +} + +bool InitSignalHandlerUnlocked(int signum) { + enum InitState { + UNINITIALIZED, + INIT_ERROR, + INITIALIZED + }; + static InitState state = UNINITIALIZED; + + // If we've already registered a handler, but we're being asked to + // change our signal, unregister the old one. + if (signum != g_stack_trace_signum && state == INITIALIZED) { + struct sigaction old_act; + PCHECK(sigaction(g_stack_trace_signum, nullptr, &old_act) == 0); + if (old_act.sa_sigaction == &HandleStackTraceSignal) { + signal(g_stack_trace_signum, SIG_DFL); + } + } + + // If we'd previously had an error, but the signal number + // is changing, we should mark ourselves uninitialized. + if (signum != g_stack_trace_signum) { + g_stack_trace_signum = signum; + state = UNINITIALIZED; + } + + if (state == UNINITIALIZED) { + struct sigaction old_act; + PCHECK(sigaction(g_stack_trace_signum, nullptr, &old_act) == 0); + if (old_act.sa_handler != SIG_DFL && + old_act.sa_handler != SIG_IGN) { + state = INIT_ERROR; + LOG(WARNING) << "signal handler for stack trace signal " + << g_stack_trace_signum + << " is already in use: " + << "Kudu will not produce thread stack traces."; + } else { + // No one appears to be using the signal. This is racy, but there is no + // atomic swap capability. + struct sigaction act; + memset(&act, 0, sizeof(act)); + act.sa_sigaction = &HandleStackTraceSignal; + act.sa_flags = SA_SIGINFO | SA_RESTART; + struct sigaction old_act; + CHECK_ERR(sigaction(g_stack_trace_signum, &act, &old_act)); + sighandler_t old_handler = old_act.sa_handler; + if (old_handler != SIG_IGN && + old_handler != SIG_DFL) { + LOG(FATAL) << "raced against another thread installing a signal handler"; + } + state = INITIALIZED; + } + } + return state == INITIALIZED; +} + +#ifdef __linux__ +GoogleOnceType g_prime_libunwind_once; + +void PrimeLibunwind() { + // The first call into libunwind does some unsafe double-checked locking + // for initialization. So, we make sure that the first call is not concurrent + // with any other call. + unw_cursor_t cursor; + unw_context_t uc; + unw_getcontext(&uc); + RAW_CHECK(unw_init_local(&cursor, &uc) >= 0, "unw_init_local failed"); +} +#endif +} // anonymous namespace + +Status SetStackTraceSignal(int signum) { + base::SpinLockHolder h(&g_signal_handler_lock); + if (!InitSignalHandlerUnlocked(signum)) { + return Status::InvalidArgument("unable to install signal handler"); + } + return Status::OK(); +} + +StackTraceCollector::StackTraceCollector(StackTraceCollector&& other) noexcept + : tid_(other.tid_), + sig_data_(other.sig_data_) { + other.tid_ = 0; + other.sig_data_ = nullptr; +} + +StackTraceCollector::~StackTraceCollector() { + if (sig_data_) { + RevokeSigData(); + } +} + +#ifdef __linux__ +bool StackTraceCollector::RevokeSigData() { + // First, exchange the atomic variable back to 'not in use'. This ensures + // that, if the signalled thread hasn't started filling in the trace yet, + // it will see the 'kNotInUse' value and abort. + int64_t old_val = sig_data_->queued_to_tid.exchange(SignalData::kNotInUse); + + // We now have two cases to consider. + + // 1) Timed out, but signal still pending and signal handler not yet invoked. + // + // In this case, the signal handler hasn't started collecting a stack trace, so when + // we exchange 'queued_to_tid', we see that it is still "queued". In case the signal + // later gets delivered, we can't free the 'sig_data_' struct itself. We intentionally + // leak it. Note, however, that if the signal handler later runs, it will see that we + // exchanged out its tid from 'queued_to_tid' and therefore won't attempt to write + // into the 'stack' structure. + if (old_val == tid_) { + // TODO(todd) instead of leaking, we can insert these lost structs into a global + // free-list, and then reuse them the next time we want to send a signal. The re-use + // is safe since access is limited to a specific tid. + DLOG(WARNING) << "Leaking SignalData structure " << sig_data_ << " after lost signal " + << "to thread " << tid_; + ANNOTATE_LEAKING_OBJECT_PTR(sig_data_); + sig_data_ = nullptr; + return false; + } + + // 2) The signal was delivered. Either the thread is currently collecting its stack + // trace (in which case we have to wait for it to finish), or it has already completed + // (in which case waiting is a no-op). + CHECK_EQ(old_val, SignalData::kDumpStarted); + CHECK(sig_data_->result_ready.WaitUntil(MonoTime::Max())); + delete sig_data_; + sig_data_ = nullptr; + return true; +} + + +Status StackTraceCollector::TriggerAsync(int64_t tid, StackTrace* stack) { + CHECK(!sig_data_ && tid_ == 0) << "TriggerAsync() must not be called more than once per instance"; + + // Ensure that our signal handler is installed. + { + base::SpinLockHolder h(&g_signal_handler_lock); + if (!InitSignalHandlerUnlocked(g_stack_trace_signum)) { + return Status::NotSupported("unable to take thread stack: signal handler unavailable"); + } + } + // Ensure that libunwind is primed for use before we send any signals. Otherwise + // we can hit a deadlock with the following stack: + // GoogleOnceInit() [waits on the 'once' to finish, but will never finish] + // StackTrace::Collect() + // <signal handler> + // PrimeLibUnwind + // GoogleOnceInit() [not yet initted, so starts initializing] + // StackTrace::Collect() + GoogleOnceInit(&g_prime_libunwind_once, &PrimeLibunwind); + + std::unique_ptr<SignalData> data(new SignalData()); + // Set the target TID in our communication structure, so if we end up with any + // delayed signal reaching some other thread, it will know to ignore it. + data->queued_to_tid = tid; + data->stack = CHECK_NOTNULL(stack); + + // We use the raw syscall here instead of kill() to ensure that we don't accidentally + // send a signal to some other process in the case that the thread has exited and + // the TID been recycled. + siginfo_t info; + memset(&info, 0, sizeof(info)); + info.si_signo = g_stack_trace_signum; + info.si_code = SI_QUEUE; + info.si_pid = getpid(); + info.si_uid = getuid(); + info.si_value.sival_ptr = data.get(); + // Since we're using a signal to pass information between the two threads, + // we need to help TSAN out and explicitly tell it about the happens-before + // relationship here. + ANNOTATE_HAPPENS_BEFORE(data.get()); + if (syscall(SYS_rt_tgsigqueueinfo, getpid(), tid, g_stack_trace_signum, &info) != 0) { + return Status::NotFound("unable to deliver signal: process may have exited"); + } + + // The signal is now pending to the target thread. We don't store it in a unique_ptr + // inside the class since we need to be careful to destruct it safely in case the + // target thread hasn't yet received the signal when this instance gets destroyed. + sig_data_ = data.release(); + tid_ = tid; + + return Status::OK(); +} + +Status StackTraceCollector::AwaitCollection(MonoTime deadline) { + CHECK(sig_data_) << "Must successfully call TriggerAsync() first"; + + // We give the thread ~1s to respond. In testing, threads typically respond within + // a few milliseconds, so this timeout is very conservative. + // + // The main reason that a thread would not respond is that it has blocked signals. For + // example, glibc's timer_thread doesn't respond to our signal, so we always time out + // on that one. + ignore_result(sig_data_->result_ready.WaitUntil(deadline)); + + // Whether or not we timed out above, revoke the signal data structure. + // It's possible that the above 'Wait' times out but it succeeds exactly + // after that timeout. In that case, RevokeSigData() will return true + // and we can return a successful result, because the destination stack trace + // has in fact been populated. + bool completed = RevokeSigData(); + if (!completed) { + return Status::TimedOut("thread did not respond: maybe it is blocking signals"); + } + + return Status::OK(); +} + +#else // #ifdef __linux__ ... +Status StackTraceCollector::TriggerAsync(int64_t tid_, StackTrace* stack) { + return Status::NotSupported("unsupported platform"); +} +Status StackTraceCollector::AwaitCollection(MonoTime deadline) { + return Status::NotSupported("unsupported platform"); +} +bool StackTraceCollector::RevokeSigData() { + return false; +} +#endif // #ifdef __linux__ ... #else ... + +Status GetThreadStack(int64_t tid, StackTrace* stack) { + StackTraceCollector c; + RETURN_NOT_OK(c.TriggerAsync(tid, stack)); + RETURN_NOT_OK(c.AwaitCollection(MonoTime::Now() + MonoDelta::FromSeconds(1))); + return Status::OK(); +} + +string DumpThreadStack(int64_t tid) { + StackTrace trace; + Status s = GetThreadStack(tid, &trace); + if (s.ok()) { + return trace.Symbolize(); + } + return strings::Substitute("<$0>", s.ToString()); +} + +Status ListThreads(vector<pid_t> *tids) { +#ifndef __linux__ + return Status::NotSupported("unable to list threads on this platform"); +#else + DIR *dir = opendir("/proc/self/task/"); + if (dir == NULL) { + return Status::IOError("failed to open task dir", ErrnoToString(errno), errno); + } + struct dirent *d; + while ((d = readdir(dir)) != NULL) { + if (d->d_name[0] != '.') { + uint32_t tid; + if (!safe_strtou32(d->d_name, &tid)) { + LOG(WARNING) << "bad tid found in procfs: " << d->d_name; + continue; + } + tids->push_back(tid); + } + } + closedir(dir); + return Status::OK(); +#endif // __linux__ +} + +string GetStackTrace() { + string s; + google::glog_internal_namespace_::DumpStackTraceToString(&s); + return s; +} + +string GetStackTraceHex() { + char buf[1024]; + HexStackTraceToString(buf, 1024); + return buf; +} + +void HexStackTraceToString(char* buf, size_t size) { + StackTrace trace; + trace.Collect(1); + trace.StringifyToHex(buf, size); +} + +string GetLogFormatStackTraceHex() { + StackTrace trace; + trace.Collect(1); + return trace.ToLogFormatHexString(); +} + +// Bogus empty function which we use below to fill in the stack trace with +// something readable to indicate that stack trace collection was unavailable. +void CouldNotCollectStackTraceBecauseInsideLibDl() { +} + +void StackTrace::Collect(int skip_frames) { + if (!debug::SafeToUnwindStack()) { + // Build a fake stack so that the user sees an appropriate message upon symbolizing + // rather than seeing an empty stack. + uintptr_t f_ptr = reinterpret_cast<uintptr_t>(&CouldNotCollectStackTraceBecauseInsideLibDl); + // Increase the pointer by one byte since the return address from a function call + // would not be the beginning of the function itself. + frames_[0] = reinterpret_cast<void*>(f_ptr + 1); + num_frames_ = 1; + return; + } + const int kMaxDepth = arraysize(frames_); + +#ifdef __linux__ + GoogleOnceInit(&g_prime_libunwind_once, &PrimeLibunwind); + + unw_cursor_t cursor; + unw_context_t uc; + unw_getcontext(&uc); + RAW_CHECK(unw_init_local(&cursor, &uc) >= 0, "unw_init_local failed"); + skip_frames++; // Do not include the "Collect" frame + + num_frames_ = 0; + while (num_frames_ < kMaxDepth) { + void *ip; + int ret = unw_get_reg(&cursor, UNW_REG_IP, reinterpret_cast<unw_word_t *>(&ip)); + if (ret < 0) { + break; + } + if (skip_frames > 0) { + skip_frames--; + } else { + frames_[num_frames_++] = ip; + } + ret = unw_step(&cursor); + if (ret <= 0) { + break; + } + } +#else + // On OSX, use the unwinder from glog. However, that unwinder has an issue where + // concurrent invocations will return no frames. See: + // https://github.com/google/glog/issues/298 + // The worst result here is an empty result. + + // google::GetStackTrace has a data race. This is called frequently, so better + // to ignore it with an annotation rather than use a suppression. + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; + num_frames_ = google::GetStackTrace(frames_, kMaxDepth, skip_frames + 1); +#endif +} + +void StackTrace::StringifyToHex(char* buf, size_t size, int flags) const { + char* dst = buf; + + // Reserve kHexEntryLength for the first iteration of the loop, 1 byte for a + // space (which we may not need if there's just one frame), and 1 for a nul + // terminator. + char* limit = dst + size - kHexEntryLength - 2; + for (int i = 0; i < num_frames_ && dst < limit; i++) { + if (i != 0) { + *dst++ = ' '; + } + if (flags & HEX_0X_PREFIX) { + *dst++ = '0'; + *dst++ = 'x'; + } + // See note in Symbolize() below about why we subtract 1 from each address here. + uintptr_t addr = reinterpret_cast<uintptr_t>(frames_[i]); + if (addr > 0 && !(flags & NO_FIX_CALLER_ADDRESSES)) { + addr--; + } + FastHex64ToBuffer(addr, dst); + dst += kHexEntryLength; + } + *dst = '\0'; +} + +string StackTrace::ToHexString(int flags) const { + // Each frame requires kHexEntryLength, plus a space + // We also need one more byte at the end for '\0' + int len_per_frame = kHexEntryLength; + len_per_frame++; // For the separating space. + if (flags & HEX_0X_PREFIX) { + len_per_frame += 2; + } + int buf_len = kMaxFrames * len_per_frame + 1; + char buf[buf_len]; + StringifyToHex(buf, buf_len, flags); + return string(buf); +} + +// Symbolization function borrowed from glog. +string StackTrace::Symbolize() const { + string ret; + for (int i = 0; i < num_frames_; i++) { + void* pc = frames_[i]; + + char tmp[1024]; + const char* symbol = "(unknown)"; + + // The return address 'pc' on the stack is the address of the instruction + // following the 'call' instruction. In the case of calling a function annotated + // 'noreturn', this address may actually be the first instruction of the next + // function, because the function we care about ends with the 'call'. + // So, we subtract 1 from 'pc' so that we're pointing at the 'call' instead + // of the return address. + // + // For example, compiling a C program with -O2 that simply calls 'abort()' yields + // the following disassembly: + // Disassembly of section .text: + // + // 0000000000400440 <main>: + // 400440: 48 83 ec 08 sub $0x8,%rsp + // 400444: e8 c7 ff ff ff callq 400410 <abort@plt> + // + // 0000000000400449 <_start>: + // 400449: 31 ed xor %ebp,%ebp + // ... + // + // If we were to take a stack trace while inside 'abort', the return pointer + // on the stack would be 0x400449 (the first instruction of '_start'). By subtracting + // 1, we end up with 0x400448, which is still within 'main'. + // + // This also ensures that we point at the correct line number when using addr2line + // on logged stacks. + // + // We check that the pc is not 0 to avoid undefined behavior in the case of + // invalid unwinding (see KUDU-2433). + if (pc && google::Symbolize( + reinterpret_cast<char *>(pc) - 1, tmp, sizeof(tmp))) { + symbol = tmp; + } + StringAppendF(&ret, " @ %*p %s\n", kPrintfPointerFieldWidth, pc, symbol); + } + return ret; +} + +string StackTrace::ToLogFormatHexString() const { + string ret; + for (int i = 0; i < num_frames_; i++) { + void* pc = frames_[i]; + StringAppendF(&ret, " @ %*p\n", kPrintfPointerFieldWidth, pc); + } + return ret; +} + +uint64_t StackTrace::HashCode() const { + return util_hash::CityHash64(reinterpret_cast<const char*>(frames_), + sizeof(frames_[0]) * num_frames_); +} + +bool StackTrace::LessThan(const StackTrace& s) const { + return std::lexicographical_compare(frames_, &frames_[num_frames_], + s.frames_, &s.frames_[num_frames_]); +} + +Status StackTraceSnapshot::SnapshotAllStacks() { + if (IsBeingDebugged()) { + return Status::Incomplete("not collecting stack trace since debugger or strace is attached"); + } + + vector<pid_t> tids; + RETURN_NOT_OK_PREPEND(ListThreads(&tids), "could not list threads"); + + collectors_.clear(); + collectors_.resize(tids.size()); + infos_.clear(); + infos_.resize(tids.size()); + for (int i = 0; i < tids.size(); i++) { + infos_[i].tid = tids[i]; + infos_[i].status = collectors_[i].TriggerAsync(tids[i], &infos_[i].stack); + } + + // Now collect the thread names while we are waiting on stack trace collection. + if (capture_thread_names_) { + for (auto& info : infos_) { + if (!info.status.ok()) continue; + + // Get the thread's name by reading proc. + // TODO(todd): should we have the dumped thread fill in its own name using + // prctl to avoid having to open and read /proc? Or maybe we should use the + // Kudu ThreadMgr to get the thread names for the cases where we are using + // the kudu::Thread wrapper at least. + faststring buf; + Status s = ReadFileToString(Env::Default(), + strings::Substitute("/proc/self/task/$0/comm", info.tid), + &buf); + if (!s.ok()) { + info.thread_name = "<unknown name>"; + } else { + info.thread_name = buf.ToString(); + StripTrailingNewline(&info.thread_name); + } + } + } + num_failed_ = 0; + MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(1); + for (int i = 0; i < infos_.size(); i++) { + infos_[i].status = infos_[i].status.AndThen([&] { + return collectors_[i].AwaitCollection(deadline); + }); + if (!infos_[i].status.ok()) { + num_failed_++; + CHECK(!infos_[i].stack.HasCollected()) << infos_[i].status.ToString(); + } + } + collectors_.clear(); + + std::sort(infos_.begin(), infos_.end(), [](const ThreadInfo& a, const ThreadInfo& b) { + return a.stack.LessThan(b.stack); + }); + return Status::OK(); +} + +void StackTraceSnapshot::VisitGroups(const StackTraceSnapshot::VisitorFunc& visitor) { + auto group_start = infos_.begin(); + auto group_end = group_start; + while (group_end != infos_.end()) { + do { + ++group_end; + } while (group_end != infos_.end() && group_end->stack.Equals(group_start->stack)); + visitor(ArrayView<ThreadInfo>(&*group_start, std::distance(group_start, group_end))); + group_start = group_end; + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug-util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug-util.h b/be/src/kudu/util/debug-util.h new file mode 100644 index 0000000..e8c94ea --- /dev/null +++ b/be/src/kudu/util/debug-util.h @@ -0,0 +1,321 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_DEBUG_UTIL_H +#define KUDU_UTIL_DEBUG_UTIL_H + +#include <sys/types.h> + +#include <cstdint> +#include <cstring> +#include <functional> +#include <string> +#include <vector> + +#include <glog/logging.h> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/strings/fastmem.h" +#include "kudu/util/status.h" + +namespace kudu { + +template <typename T> class ArrayView; +class MonoTime; +class StackTrace; +class StackTraceCollector; + +namespace stack_trace_internal { +struct SignalData; +} + +// Return true if coverage is enabled. +bool IsCoverageBuild(); + +// Try to flush coverage info. If another thread is already flushing +// coverage, this returns without doing anything, since flushing coverage +// is not thread-safe or re-entrant. +void TryFlushCoverage(); + +// Return a list of all of the thread IDs currently running in this process. +// Not async-safe. +Status ListThreads(std::vector<pid_t>* tids); + +// Set which POSIX signal number should be used internally for triggering +// stack traces. If the specified signal handler is already in use, this +// returns an error, and stack traces will be disabled. +Status SetStackTraceSignal(int signum); + +// Return the stack trace of the given thread, stringified and symbolized. +// +// Note that the symbolization happens on the calling thread, not the target +// thread, so this is relatively low-impact on the target. +// +// This is safe to use against the current thread, the main thread, or any other +// thread. It requires that the target thread has not blocked POSIX signals. If +// it has, an error message will be returned. +// +// This function is thread-safe. +// +// NOTE: if Kudu is running inside a debugger, this can be annoying to a developer since +// it internally uses signals that will cause the debugger to stop. Consider checking +// 'IsBeingDebugged()' from os-util.h before using this function for non-critical use +// cases. +std::string DumpThreadStack(int64_t tid); + +// Capture the thread stack of another thread +// +// NOTE: if Kudu is running inside a debugger, this can be annoying to a developer since +// it internally uses signals that will cause the debugger to stop. Consider checking +// 'IsBeingDebugged()' from os-util.h before using this function for non-critical use +// cases. +Status GetThreadStack(int64_t tid, StackTrace* stack); + +// Return the current stack trace, stringified. +std::string GetStackTrace(); + +// Return the current stack trace, in hex form. This is significantly +// faster than GetStackTrace() above, so should be used in performance-critical +// places like TRACE() calls. If you really need blazing-fast speed, though, +// use HexStackTraceToString() into a stack-allocated buffer instead -- +// this call causes a heap allocation for the std::string. +// +// Note that this is much more useful in the context of a static binary, +// since addr2line wouldn't know where shared libraries were mapped at +// runtime. +std::string GetStackTraceHex(); + +// This is the same as GetStackTraceHex(), except multi-line in a format that +// looks very similar to GetStackTrace() but without symbols. Because it's in +// that format, the tool stacktrace_addr2line.pl in the kudu build-support +// directory can symbolize it automatically (to the extent that addr2line(1) +// is able to find the symbols). +std::string GetLogFormatStackTraceHex(); + +// Collect the current stack trace in hex form into the given buffer. +// +// The resulting trace just includes the hex addresses, space-separated. This is suitable +// for later stringification by pasting into 'addr2line' for example. +// +// This function is async-safe. +void HexStackTraceToString(char* buf, size_t size); + +// Efficient class for collecting and later stringifying a stack trace. +// +// Requires external synchronization. +class StackTrace { + public: + + // Constructs a new (uncollected) stack trace. + StackTrace() + : num_frames_(0) { + } + + // Resets the stack trace to an uncollected state. + void Reset() { + num_frames_ = 0; + } + + // Returns true if Collect() (but not Reset()) has been called on this stack trace. + bool HasCollected() const { + return num_frames_ > 0; + } + + // Copies the contents of 's' into this stack trace. + void CopyFrom(const StackTrace& s) { + memcpy(this, &s, sizeof(s)); + } + + // Returns true if the stack trace 's' matches this trace. + bool Equals(const StackTrace& s) const { + return s.num_frames_ == num_frames_ && + strings::memeq(frames_, s.frames_, + num_frames_ * sizeof(frames_[0])); + } + + // Comparison operator for use in sorting. + bool LessThan(const StackTrace& s) const; + + // Collect and store the current stack trace. Skips the top 'skip_frames' frames + // from the stack. For example, a value of '1' will skip whichever function + // called the 'Collect()' function. The 'Collect' function itself is always skipped. + // + // This function is async-safe. + void Collect(int skip_frames = 0); + + int num_frames() const { + return num_frames_; + } + + void* frame(int i) const { + DCHECK_LE(i, num_frames_); + return frames_[i]; + } + + enum Flags { + // Do not fix up the addresses on the stack to try to point to the 'call' + // instructions instead of the return address. This is necessary when dumping + // addresses to be interpreted by 'pprof', which does this fix-up itself. + NO_FIX_CALLER_ADDRESSES = 1, + + // Prefix each hex address with '0x'. This is required by the go version + // of pprof when parsing stack traces. + HEX_0X_PREFIX = 1 << 1, + }; + + // Stringify the trace into the given buffer. + // The resulting output is hex addresses suitable for passing into 'addr2line' + // later. + // + // Async-safe. + void StringifyToHex(char* buf, size_t size, int flags = 0) const; + + // Same as above, but returning a std::string. + // This is not async-safe. + std::string ToHexString(int flags = 0) const; + + // Return a string with a symbolized backtrace in a format suitable for + // printing to a log file. + // This is not async-safe. + std::string Symbolize() const; + + // Return a string with a hex-only backtrace in the format typically used in + // log files. Similar to the format given by Symbolize(), but symbols are not + // resolved (only the hex addresses are given). + std::string ToLogFormatHexString() const; + + uint64_t HashCode() const; + + private: + enum { + // The maximum number of stack frames to collect. + kMaxFrames = 16, + + // The max number of characters any frame requires in string form. + kHexEntryLength = 16 + }; + + int num_frames_; + void* frames_[kMaxFrames]; +}; + +// Utility class for gathering a process-wide snapshot of the stack traces +// of all threads. +class StackTraceSnapshot { + public: + // The information about each thread will be gathered in a struct. + struct ThreadInfo { + // The TID of the thread. + int64_t tid; + + // The status of collection. If a thread exits during collection or + // was blocking signals, it's possible to have an error here. + Status status; + + // The name of the thread. + // May be missing if 'status' is not OK or if thread name collection was + // disabled. + std::string thread_name; + + // The current stack trace of the thread. + // Always missing if 'status' is not OK. + StackTrace stack; + }; + using VisitorFunc = std::function<void(ArrayView<ThreadInfo> group)>; + + void set_capture_thread_names(bool c) { + capture_thread_names_ = c; + } + + // Snapshot the stack traces of all threads in the process. This may return a bad + // Status in the case that stack traces aren't supported on the platform, or if + // the process is running inside a debugger. + // + // NOTE: this may take some time and should not be called in a latency-sensitive + // context. + Status SnapshotAllStacks(); + + // After having collected stacks, visit them, grouped by shared + // stack trace. The visitor function will be called once per group. + // Each group is guaranteed to be non-empty. + // + // Any threads which failed to collect traces are returned as a single group + // having empty stack traces. + // + // REQUIRES: a previous successful call to SnapshotAllStacks(). + void VisitGroups(const VisitorFunc& visitor); + + // Return the number of threads which were interrogated for a stack trace. + // + // NOTE: this includes threads which failed to collect. + int num_threads() const { return infos_.size(); } + + // Return the number of threads which failed to collect a stack trace. + int num_failed() const { return num_failed_; } + + private: + std::vector<StackTraceSnapshot::ThreadInfo> infos_; + std::vector<StackTraceCollector> collectors_; + int num_failed_ = 0; + + bool capture_thread_names_ = true; +}; + + +// Class to collect the stack trace of another thread within this process. +// This allows for more advanced use cases than 'DumpThreadStack(tid)' above. +// Namely, this provides an asynchronous trigger/collect API so that many +// stack traces can be collected from many different threads in parallel using +// different instances of this object. +class StackTraceCollector { + public: + StackTraceCollector() = default; + StackTraceCollector(StackTraceCollector&& other) noexcept; + ~StackTraceCollector(); + + // Send the asynchronous request to the the thread with TID 'tid' + // to collect its stack trace into '*stack'. + // + // NOTE: 'stack' must remain a valid pointer until AwaitCollection() has + // completed. + // + // Returns OK if the signal was sent successfully. + Status TriggerAsync(int64_t tid, StackTrace* stack); + + // Wait for the stack trace to be collected from the target thread. + // + // REQUIRES: TriggerAsync() has returned successfully. + Status AwaitCollection(MonoTime deadline); + + private: + DISALLOW_COPY_AND_ASSIGN(StackTraceCollector); + + // Safely sets 'sig_data_' back to nullptr after having sent an asynchronous + // stack trace request. See implementation for details. + // + // Returns true if the stack trace was collected before revocation + // and false if it was not. + // + // POSTCONDITION: sig_data_ == nullptr + bool RevokeSigData(); + + int64_t tid_ = 0; + stack_trace_internal::SignalData* sig_data_ = nullptr; +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug/leak_annotations.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug/leak_annotations.h b/be/src/kudu/util/debug/leak_annotations.h new file mode 100644 index 0000000..2bfc3d8 --- /dev/null +++ b/be/src/kudu/util/debug/leak_annotations.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_DEBUG_LEAK_ANNOTATIONS_H_ +#define KUDU_UTIL_DEBUG_LEAK_ANNOTATIONS_H_ + +// Ignore a single leaked object, given its pointer. +// Does nothing if LeakSanitizer is not enabled. +#define ANNOTATE_LEAKING_OBJECT_PTR(p) + +#if defined(__has_feature) +# if __has_feature(address_sanitizer) +# if defined(__linux__) + +#undef ANNOTATE_LEAKING_OBJECT_PTR +#define ANNOTATE_LEAKING_OBJECT_PTR(p) __lsan_ignore_object(p); + +# endif +# endif +#endif + +// API definitions from LLVM lsan_interface.h + +extern "C" { + // Allocations made between calls to __lsan_disable() and __lsan_enable() will + // be treated as non-leaks. Disable/enable pairs may be nested. + void __lsan_disable(); + void __lsan_enable(); + + // The heap object into which p points will be treated as a non-leak. + void __lsan_ignore_object(const void *p); + + // The user may optionally provide this function to disallow leak checking + // for the program it is linked into (if the return value is non-zero). This + // function must be defined as returning a constant value; any behavior beyond + // that is unsupported. + int __lsan_is_turned_off(); + + // Check for leaks now. This function behaves identically to the default + // end-of-process leak check. In particular, it will terminate the process if + // leaks are found and the exitcode runtime flag is non-zero. + // Subsequent calls to this function will have no effect and end-of-process + // leak check will not run. Effectively, end-of-process leak check is moved to + // the time of first invocation of this function. + // By calling this function early during process shutdown, you can instruct + // LSan to ignore shutdown-only leaks which happen later on. + void __lsan_do_leak_check(); + + // Check for leaks now. Returns zero if no leaks have been found or if leak + // detection is disabled, non-zero otherwise. + // This function may be called repeatedly, e.g. to periodically check a + // long-running process. It prints a leak report if appropriate, but does not + // terminate the process. It does not affect the behavior of + // __lsan_do_leak_check() or the end-of-process leak check, and is not + // affected by them. + int __lsan_do_recoverable_leak_check(); +} // extern "C" + +namespace kudu { +namespace debug { + +class ScopedLSANDisabler { + public: + ScopedLSANDisabler() { __lsan_disable(); } + ~ScopedLSANDisabler() { __lsan_enable(); } +}; + +} // namespace debug +} // namespace kudu + +#endif // KUDU_UTIL_DEBUG_LEAK_ANNOTATIONS_H_