http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env_util.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/env_util.cc b/be/src/kudu/util/env_util.cc new file mode 100644 index 0000000..e9117a4 --- /dev/null +++ b/be/src/kudu/util/env_util.cc @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/env_util.h" + +#include <algorithm> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/bind.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/strings/util.h" +#include "kudu/util/debug-util.h" +#include "kudu/util/env.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/path_util.h" +#include "kudu/util/status.h" + +DEFINE_int64(disk_reserved_bytes_free_for_testing, -1, + "For testing only! Set to number of bytes free on each filesystem. " + "Set to -1 to disable this test-specific override"); +TAG_FLAG(disk_reserved_bytes_free_for_testing, runtime); +TAG_FLAG(disk_reserved_bytes_free_for_testing, unsafe); + +// We define some flags for testing purposes: Two prefixes and their associated +// "bytes free" overrides. +DEFINE_string(disk_reserved_override_prefix_1_path_for_testing, "", + "For testing only! Specifies a prefix to override the visible 'bytes free' on. " + "Use --disk_reserved_override_prefix_1_bytes_free_for_testing to set the number of " + "bytes free for this path prefix. Set to empty string to disable."); +DEFINE_int64(disk_reserved_override_prefix_1_bytes_free_for_testing, -1, + "For testing only! Set number of bytes free on the path prefix specified by " + "--disk_reserved_override_prefix_1_path_for_testing. Set to -1 to disable."); +DEFINE_string(disk_reserved_override_prefix_2_path_for_testing, "", + "For testing only! Specifies a prefix to override the visible 'bytes free' on. " + "Use --disk_reserved_override_prefix_2_bytes_free_for_testing to set the number of " + "bytes free for this path prefix. Set to empty string to disable."); +DEFINE_int64(disk_reserved_override_prefix_2_bytes_free_for_testing, -1, + "For testing only! Set number of bytes free on the path prefix specified by " + "--disk_reserved_override_prefix_2_path_for_testing. Set to -1 to disable."); +TAG_FLAG(disk_reserved_override_prefix_1_path_for_testing, unsafe); +TAG_FLAG(disk_reserved_override_prefix_2_path_for_testing, unsafe); +TAG_FLAG(disk_reserved_override_prefix_1_bytes_free_for_testing, unsafe); +TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, unsafe); +TAG_FLAG(disk_reserved_override_prefix_1_bytes_free_for_testing, runtime); +TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, runtime); + +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace env_util { + +Status OpenFileForWrite(Env* env, const string& path, + shared_ptr<WritableFile>* file) { + return OpenFileForWrite(WritableFileOptions(), env, path, file); +} + +Status OpenFileForWrite(const WritableFileOptions& opts, + Env *env, const string &path, + shared_ptr<WritableFile> *file) { + unique_ptr<WritableFile> w; + RETURN_NOT_OK(env->NewWritableFile(opts, path, &w)); + file->reset(w.release()); + return Status::OK(); +} + +Status OpenFileForRandom(Env *env, const string &path, + shared_ptr<RandomAccessFile> *file) { + unique_ptr<RandomAccessFile> r; + RETURN_NOT_OK(env->NewRandomAccessFile(path, &r)); + file->reset(r.release()); + return Status::OK(); +} + +Status OpenFileForSequential(Env *env, const string &path, + shared_ptr<SequentialFile> *file) { + unique_ptr<SequentialFile> r; + RETURN_NOT_OK(env->NewSequentialFile(path, &r)); + file->reset(r.release()); + return Status::OK(); +} + +// If any of the override gflags specifies an override for the given path, then +// override the free bytes to match what is specified in the flag. See the +// definitions of these test-only flags for more information. +static void OverrideBytesFreeWithTestingFlags(const string& path, int64_t* bytes_free) { + const string* prefixes[] = { &FLAGS_disk_reserved_override_prefix_1_path_for_testing, + &FLAGS_disk_reserved_override_prefix_2_path_for_testing }; + const int64_t* overrides[] = { &FLAGS_disk_reserved_override_prefix_1_bytes_free_for_testing, + &FLAGS_disk_reserved_override_prefix_2_bytes_free_for_testing }; + for (int i = 0; i < arraysize(prefixes); i++) { + if (*overrides[i] != -1 && !prefixes[i]->empty() && HasPrefixString(path, *prefixes[i])) { + *bytes_free = *overrides[i]; + return; + } + } +} + +Status VerifySufficientDiskSpace(Env *env, const std::string& path, + int64_t requested_bytes, int64_t reserved_bytes) { + const int64_t kOnePercentReservation = -1; + DCHECK_GE(requested_bytes, 0); + + SpaceInfo space_info; + RETURN_NOT_OK(env->GetSpaceInfo(path, &space_info)); + int64_t available_bytes = space_info.free_bytes; + + // Allow overriding these values by tests. + if (PREDICT_FALSE(FLAGS_disk_reserved_bytes_free_for_testing > -1)) { + available_bytes = FLAGS_disk_reserved_bytes_free_for_testing; + } + if (PREDICT_FALSE(FLAGS_disk_reserved_override_prefix_1_bytes_free_for_testing != -1 || + FLAGS_disk_reserved_override_prefix_2_bytes_free_for_testing != -1)) { + OverrideBytesFreeWithTestingFlags(path, &available_bytes); + } + + // If they requested a one percent reservation, calculate what that is in bytes. + if (reserved_bytes == kOnePercentReservation) { + reserved_bytes = space_info.capacity_bytes / 100; + } + + if (available_bytes - requested_bytes < reserved_bytes) { + return Status::IOError(Substitute("Insufficient disk space to allocate $0 bytes under path $1 " + "($2 bytes available vs $3 bytes reserved)", + requested_bytes, path, available_bytes, reserved_bytes), + "", ENOSPC); + } + return Status::OK(); +} + +Status CreateDirIfMissing(Env* env, const string& path, bool* created) { + Status s = env->CreateDir(path); + if (created != nullptr) { + *created = s.ok(); + } + return s.IsAlreadyPresent() ? Status::OK() : s; +} + +Status CreateDirsRecursively(Env* env, const string& path) { + vector<string> segments = SplitPath(path); + string partial_path; + for (const string& segment : segments) { + partial_path = partial_path.empty() ? segment : JoinPathSegments(partial_path, segment); + bool is_dir; + Status s = env->IsDirectory(partial_path, &is_dir); + if (s.ok()) { + // We didn't get a NotFound error, so something is there. + if (is_dir) continue; // It's a normal directory. + // Maybe a file or a symlink. Let's try to follow the symlink. + string real_partial_path; + RETURN_NOT_OK(env->Canonicalize(partial_path, &real_partial_path)); + s = env->IsDirectory(real_partial_path, &is_dir); + if (s.ok() && is_dir) continue; // It's a symlink to a directory. + } + RETURN_NOT_OK_PREPEND(env->CreateDir(partial_path), "Unable to create directory"); + } + return Status::OK(); +} + +Status CopyFile(Env* env, const string& source_path, const string& dest_path, + WritableFileOptions opts) { + unique_ptr<SequentialFile> source; + RETURN_NOT_OK(env->NewSequentialFile(source_path, &source)); + uint64_t size; + RETURN_NOT_OK(env->GetFileSize(source_path, &size)); + + unique_ptr<WritableFile> dest; + RETURN_NOT_OK(env->NewWritableFile(opts, dest_path, &dest)); + RETURN_NOT_OK(dest->PreAllocate(size)); + + const int32_t kBufferSize = 1024 * 1024; + unique_ptr<uint8_t[]> scratch(new uint8_t[kBufferSize]); + + uint64_t bytes_read = 0; + while (bytes_read < size) { + uint64_t max_bytes_to_read = std::min<uint64_t>(size - bytes_read, kBufferSize); + Slice data(scratch.get(), max_bytes_to_read); + RETURN_NOT_OK(source->Read(&data)); + RETURN_NOT_OK(dest->Append(data)); + bytes_read += data.size(); + } + return Status::OK(); +} + +Status DeleteExcessFilesByPattern(Env* env, const string& pattern, int max_matches) { + // Negative numbers don't make sense for our interface. + DCHECK_GE(max_matches, 0); + + vector<string> matching_files; + RETURN_NOT_OK(env->Glob(pattern, &matching_files)); + + if (matching_files.size() <= max_matches) { + return Status::OK(); + } + + vector<pair<time_t, string>> matching_file_mtimes; + for (string& matching_file_path : matching_files) { + int64_t mtime; + RETURN_NOT_OK(env->GetFileModifiedTime(matching_file_path, &mtime)); + matching_file_mtimes.emplace_back(mtime, std::move(matching_file_path)); + } + + // Use mtime to determine which matching files to delete. This could + // potentially be ambiguous, depending on the resolution of last-modified + // timestamp in the filesystem, but that is part of the contract. + std::sort(matching_file_mtimes.begin(), matching_file_mtimes.end()); + matching_file_mtimes.resize(matching_file_mtimes.size() - max_matches); + + for (const auto& matching_file : matching_file_mtimes) { + RETURN_NOT_OK(env->DeleteFile(matching_file.second)); + } + + return Status::OK(); +} + +// Callback for DeleteTmpFilesRecursively(). +// +// Tests 'basename' for the Kudu-specific tmp file infix, and if found, +// deletes the file. +static Status DeleteTmpFilesRecursivelyCb(Env* env, + Env::FileType file_type, + const string& dirname, + const string& basename) { + if (file_type != Env::FILE_TYPE) { + // Skip directories. + return Status::OK(); + } + + if (basename.find(kTmpInfix) != string::npos) { + string filename = JoinPathSegments(dirname, basename); + WARN_NOT_OK(env->DeleteFile(filename), + Substitute("Failed to remove temporary file $0", filename)); + } + return Status::OK(); +} + +Status DeleteTmpFilesRecursively(Env* env, const string& path) { + return env->Walk(path, Env::PRE_ORDER, Bind(&DeleteTmpFilesRecursivelyCb, env)); +} + +ScopedFileDeleter::ScopedFileDeleter(Env* env, std::string path) + : env_(DCHECK_NOTNULL(env)), path_(std::move(path)), should_delete_(true) {} + +ScopedFileDeleter::~ScopedFileDeleter() { + if (should_delete_) { + bool is_dir; + Status s = env_->IsDirectory(path_, &is_dir); + WARN_NOT_OK(s, Substitute( + "Failed to determine if path is a directory: $0", path_)); + if (!s.ok()) { + return; + } + if (is_dir) { + WARN_NOT_OK(env_->DeleteDir(path_), + Substitute("Failed to remove directory: $0", path_)); + } else { + WARN_NOT_OK(env_->DeleteFile(path_), + Substitute("Failed to remove file: $0", path_)); + } + } +} + +} // namespace env_util +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/env_util.h b/be/src/kudu/util/env_util.h new file mode 100644 index 0000000..f93d81b --- /dev/null +++ b/be/src/kudu/util/env_util.h @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_ENV_UTIL_H +#define KUDU_UTIL_ENV_UTIL_H + +#include <memory> +#include <string> + +#include "kudu/gutil/macros.h" +#include "kudu/util/env.h" + +namespace kudu { +namespace env_util { + +Status OpenFileForWrite(Env *env, const std::string &path, + std::shared_ptr<WritableFile> *file); + +Status OpenFileForWrite(const WritableFileOptions& opts, + Env *env, const std::string &path, + std::shared_ptr<WritableFile> *file); + +Status OpenFileForRandom(Env *env, const std::string &path, + std::shared_ptr<RandomAccessFile> *file); + +Status OpenFileForSequential(Env *env, const std::string &path, + std::shared_ptr<SequentialFile> *file); + +// Returns Status::IOError with POSIX code ENOSPC if there is not sufficient +// disk space to write 'bytes' bytes to the file system represented by 'path'. +// Otherwise returns OK. +// If 'reserved_bytes' equals -1, it is interpreted as a 1% reservation. No +// other values less than 0 are supported at this time. +Status VerifySufficientDiskSpace(Env *env, const std::string& path, + int64_t requested_bytes, int64_t reserved_bytes); + +// Creates the directory given by 'path', unless it already exists. +// +// If 'created' is not NULL, sets it to true if the directory was +// created, false otherwise. +Status CreateDirIfMissing(Env* env, const std::string& path, + bool* created = NULL); + +// Recursively create directories, if they do not exist, along the given path. +// Returns OK if successful or if the given path already existed. +// Upon failure, it is possible that some part of the directory structure may +// have been successfully created. Emulates the behavior of `mkdir -p`. +Status CreateDirsRecursively(Env* env, const std::string& path); + +// Copy the contents of file source_path to file dest_path. +// This is not atomic, and if there is an error while reading or writing, +// a partial copy may be left in 'dest_path'. Does not fsync the parent +// directory of dest_path -- if you need durability then do that yourself. +Status CopyFile(Env* env, const std::string& source_path, const std::string& dest_path, + WritableFileOptions opts); + +// Deletes files matching 'pattern' in excess of 'max_matches' files. +// 'max_matches' must be greater than or equal to 0. +// The oldest files are deleted first, as determined by last modified time. +// In the case that multiple files have the same last modified time, it is not +// defined which file will be deleted first. +Status DeleteExcessFilesByPattern(Env* env, const std::string& pattern, int max_matches); + +// Traverses 'path' recursively and deletes all files matching the special Kudu +// tmp file infix. Does not follow symlinks. +// +// Deletion errors generate warnings but do not halt the traversal. +Status DeleteTmpFilesRecursively(Env* env, const std::string& path); + +// Deletes a file or directory when this object goes out of scope. +// +// The deletion may be cancelled by calling .Cancel(). +// This is typically useful for cleaning up temporary files if the +// creation of the tmp file may fail. +class ScopedFileDeleter { + public: + ScopedFileDeleter(Env* env, std::string path); + ~ScopedFileDeleter(); + + // Do not delete the file when this object goes out of scope. + void Cancel() { + should_delete_ = false; + } + + private: + Env* const env_; + const std::string path_; + bool should_delete_; + + DISALLOW_COPY_AND_ASSIGN(ScopedFileDeleter); +}; + +} // namespace env_util +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/errno-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/errno-test.cc b/be/src/kudu/util/errno-test.cc new file mode 100644 index 0000000..911ca14 --- /dev/null +++ b/be/src/kudu/util/errno-test.cc @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <string> + +#include <gtest/gtest.h> + +#include "kudu/gutil/macros.h" +#include "kudu/util/errno.h" + +using std::string; + +namespace kudu { + +TEST(OsUtilTest, TestErrnoToString) { + int err = ENOENT; + + // Non-truncated result. + ASSERT_EQ("No such file or directory", ErrnoToString(err)); + + // Truncated because of a short buffer. + char buf[2]; + ErrnoToCString(err, buf, arraysize(buf)); + ASSERT_EQ("N", string(buf)); + + // Unknown error. + string expected = "Unknown error"; + ASSERT_EQ(ErrnoToString(-1).compare(0, expected.length(), expected), 0); + + // Unknown error (truncated). + ErrnoToCString(-1, buf, arraysize(buf)); + ASSERT_EQ("U", string(buf)); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/errno.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/errno.cc b/be/src/kudu/util/errno.cc new file mode 100644 index 0000000..65b2173 --- /dev/null +++ b/be/src/kudu/util/errno.cc @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/errno.h" + +#include <errno.h> +#include <string.h> + +#include "kudu/gutil/dynamic_annotations.h" +#include "kudu/util/logging.h" + +namespace kudu { + +void ErrnoToCString(int err, char *buf, size_t buf_len) { + CHECK_GT(buf_len, 0); +#if !defined(__GLIBC__) || \ + ((_POSIX_C_SOURCE >= 200112 || _XOPEN_SOURCE >= 600) && !defined(_GNU_SOURCE)) + // Using POSIX version 'int strerror_r(...)'. + int ret = strerror_r(err, buf, buf_len); + if (ret && ret != ERANGE && ret != EINVAL) { + strncpy(buf, "unknown error", buf_len); + buf[buf_len - 1] = '\0'; + } +#else + // Using GLIBC version + + // KUDU-1515: TSAN in Clang 3.9 has an incorrect interceptor for strerror_r: + // https://github.com/google/sanitizers/issues/696 + ANNOTATE_IGNORE_WRITES_BEGIN(); + char* ret = strerror_r(err, buf, buf_len); + ANNOTATE_IGNORE_WRITES_END(); + if (ret != buf) { + strncpy(buf, ret, buf_len); + buf[buf_len - 1] = '\0'; + } +#endif +} +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/errno.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/errno.h b/be/src/kudu/util/errno.h new file mode 100644 index 0000000..7d5416e --- /dev/null +++ b/be/src/kudu/util/errno.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_ERRNO_H +#define KUDU_ERRNO_H + +#include <string> + +namespace kudu { + +void ErrnoToCString(int err, char *buf, size_t buf_len); + +// Return a string representing an errno. +inline static std::string ErrnoToString(int err) { + char buf[512]; + ErrnoToCString(err, buf, sizeof(buf)); + return std::string(buf); +} + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/failure_detector-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/failure_detector-test.cc b/be/src/kudu/util/failure_detector-test.cc new file mode 100644 index 0000000..306d81a --- /dev/null +++ b/be/src/kudu/util/failure_detector-test.cc @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include <gtest/gtest.h> +#include <string> + +#include "kudu/gutil/bind.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/failure_detector.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/test_util.h" + +namespace kudu { + +// How often we expect a node to heartbeat to assert its "aliveness". +static const int kExpectedHeartbeatPeriodMillis = 100; + +// Number of heartbeats after which the FD will consider the node dead. +static const int kMaxMissedHeartbeats = 2; + +// Let's check for failures every 100ms on average +/- 10ms. +static const int kFailureMonitorMeanMillis = 100; +static const int kFailureMonitorStddevMillis = 10; + +static const char* kNodeName = "node-1"; +static const char* kTestTabletName = "test-tablet"; + +class FailureDetectorTest : public KuduTest { + public: + FailureDetectorTest() + : KuduTest(), + latch_(1), + monitor_(new RandomizedFailureMonitor(SeedRandom(), + kFailureMonitorMeanMillis, + kFailureMonitorStddevMillis)) { + } + + void FailureFunction(const std::string& name, const Status& status) { + LOG(INFO) << "Detected failure of " << name; + latch_.CountDown(); + } + + protected: + void WaitForFailure() { + latch_.Wait(); + } + + CountDownLatch latch_; + gscoped_ptr<RandomizedFailureMonitor> monitor_; +}; + +// Tests that we can track a node, that while we notify that we're received messages from +// that node everything is ok and that once we stop doing so the failure detection function +// gets called. +TEST_F(FailureDetectorTest, TestDetectsFailure) { + ASSERT_OK(monitor_->Start()); + + scoped_refptr<FailureDetector> detector(new TimedFailureDetector( + MonoDelta::FromMilliseconds(kExpectedHeartbeatPeriodMillis * kMaxMissedHeartbeats))); + + monitor_->MonitorFailureDetector(kTestTabletName, detector); + ASSERT_FALSE(detector->IsTracking(kNodeName)); + ASSERT_OK(detector->Track(kNodeName, + MonoTime::Now(), + Bind(&FailureDetectorTest::FailureFunction, Unretained(this)))); + ASSERT_TRUE(detector->IsTracking(kNodeName)); + + const int kNumPeriodsToWait = 4; // Num heartbeat periods to wait for a failure. + const int kUpdatesPerPeriod = 10; // Num updates we give per period to minimize test flakiness. + + for (int i = 0; i < kNumPeriodsToWait * kUpdatesPerPeriod; i++) { + // Report in (heartbeat) to the detector. + ASSERT_OK(detector->MessageFrom(kNodeName, MonoTime::Now())); + + // We sleep for a fraction of heartbeat period, to minimize test flakiness. + SleepFor(MonoDelta::FromMilliseconds(kExpectedHeartbeatPeriodMillis / kUpdatesPerPeriod)); + + // The latch shouldn't have counted down, since the node's been reporting that + // it's still alive. + ASSERT_EQ(1, latch_.count()); + } + + // If we stop reporting he node is alive the failure callback is eventually + // triggered and we exit. + WaitForFailure(); + + ASSERT_OK(detector->UnTrack(kNodeName)); + ASSERT_FALSE(detector->IsTracking(kNodeName)); + + ASSERT_OK(monitor_->UnmonitorFailureDetector(kTestTabletName)); + monitor_->Shutdown(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/failure_detector.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/failure_detector.cc b/be/src/kudu/util/failure_detector.cc new file mode 100644 index 0000000..510cbca --- /dev/null +++ b/be/src/kudu/util/failure_detector.cc @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/failure_detector.h" + +#include <glog/logging.h> +#include <mutex> +#include <unordered_map> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/locks.h" +#include "kudu/util/random_util.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" + +namespace kudu { + +using std::unordered_map; +using strings::Substitute; + +const int64_t RandomizedFailureMonitor::kMinWakeUpTimeMillis = 10; + +TimedFailureDetector::TimedFailureDetector(MonoDelta failure_period) + : failure_period_(std::move(failure_period)) {} + +TimedFailureDetector::~TimedFailureDetector() { + STLDeleteValues(&nodes_); +} + +Status TimedFailureDetector::Track(const string& name, + const MonoTime& now, + const FailureDetectedCallback& callback) { + std::lock_guard<simple_spinlock> lock(lock_); + gscoped_ptr<Node> node(new Node); + node->permanent_name = name; + node->callback = callback; + node->last_heard_of = now; + node->status = ALIVE; + if (!InsertIfNotPresent(&nodes_, name, node.get())) { + return Status::AlreadyPresent( + Substitute("Node with name '$0' is already being monitored", name)); + } + ignore_result(node.release()); + return Status::OK(); +} + +Status TimedFailureDetector::UnTrack(const string& name) { + std::lock_guard<simple_spinlock> lock(lock_); + Node* node = EraseKeyReturnValuePtr(&nodes_, name); + if (PREDICT_FALSE(node == NULL)) { + return Status::NotFound(Substitute("Node with name '$0' not found", name)); + } + delete node; + return Status::OK(); +} + +bool TimedFailureDetector::IsTracking(const std::string& name) { + std::lock_guard<simple_spinlock> lock(lock_); + return ContainsKey(nodes_, name); +} + +Status TimedFailureDetector::MessageFrom(const std::string& name, const MonoTime& now) { + VLOG(3) << "Received message from " << name << " at " << now.ToString(); + std::lock_guard<simple_spinlock> lock(lock_); + Node* node = FindPtrOrNull(nodes_, name); + if (node == NULL) { + VLOG(1) << "Not tracking node: " << name; + return Status::NotFound(Substitute("Message from unknown node '$0'", name)); + } + node->last_heard_of = now; + node->status = ALIVE; + return Status::OK(); +} + +FailureDetector::NodeStatus TimedFailureDetector::GetNodeStatusUnlocked(const std::string& name, + const MonoTime& now) { + Node* node = FindOrDie(nodes_, name); + if ((now - node->last_heard_of) > failure_period_) { + node->status = DEAD; + } + return node->status; +} + +void TimedFailureDetector::CheckForFailures(const MonoTime& now) { + typedef unordered_map<string, FailureDetectedCallback> CallbackMap; + CallbackMap callbacks; + { + std::lock_guard<simple_spinlock> lock(lock_); + for (const NodeMap::value_type& entry : nodes_) { + if (GetNodeStatusUnlocked(entry.first, now) == DEAD) { + InsertOrDie(&callbacks, entry.first, entry.second->callback); + } + } + } + // Invoke failure callbacks outside of lock. + for (const CallbackMap::value_type& entry : callbacks) { + const string& node_name = entry.first; + const FailureDetectedCallback& callback = entry.second; + callback.Run(node_name, Status::RemoteError(Substitute("Node '$0' failed", node_name))); + } +} + +RandomizedFailureMonitor::RandomizedFailureMonitor(uint32_t random_seed, + int64_t period_mean_millis, + int64_t period_stddev_millis) + : period_mean_millis_(period_mean_millis), + period_stddev_millis_(period_stddev_millis), + random_(random_seed), + run_latch_(0), + shutdown_(false) { +} + +RandomizedFailureMonitor::~RandomizedFailureMonitor() { + Shutdown(); +} + +Status RandomizedFailureMonitor::Start() { + CHECK(!thread_); + run_latch_.Reset(1); + return Thread::Create("failure-monitors", "failure-monitor", + &RandomizedFailureMonitor::RunThread, + this, &thread_); +} + +void RandomizedFailureMonitor::Shutdown() { + if (!thread_) { + return; + } + + { + std::lock_guard<simple_spinlock> l(lock_); + if (shutdown_) { + return; + } + shutdown_ = true; + } + + run_latch_.CountDown(); + CHECK_OK(ThreadJoiner(thread_.get()).Join()); + thread_.reset(); +} + +Status RandomizedFailureMonitor::MonitorFailureDetector(const string& name, + const scoped_refptr<FailureDetector>& fd) { + std::lock_guard<simple_spinlock> l(lock_); + bool inserted = InsertIfNotPresent(&fds_, name, fd); + if (PREDICT_FALSE(!inserted)) { + return Status::AlreadyPresent(Substitute("Already monitoring failure detector '$0'", name)); + } + return Status::OK(); +} + +Status RandomizedFailureMonitor::UnmonitorFailureDetector(const string& name) { + std::lock_guard<simple_spinlock> l(lock_); + int count = fds_.erase(name); + if (PREDICT_FALSE(count == 0)) { + return Status::NotFound(Substitute("Failure detector '$0' not found", name)); + } + return Status::OK(); +} + +void RandomizedFailureMonitor::RunThread() { + VLOG(1) << "Failure monitor thread starting"; + + while (true) { + int64_t wait_millis = random_.Normal(period_mean_millis_, period_stddev_millis_); + if (wait_millis < kMinWakeUpTimeMillis) { + wait_millis = kMinWakeUpTimeMillis; + } + + MonoDelta wait_delta = MonoDelta::FromMilliseconds(wait_millis); + VLOG(3) << "RandomizedFailureMonitor sleeping for: " << wait_delta.ToString(); + if (run_latch_.WaitFor(wait_delta)) { + // CountDownLatch reached 0. + std::lock_guard<simple_spinlock> lock(lock_); + // Check if we were told to shutdown. + if (shutdown_) { + // Latch fired: exit loop. + VLOG(1) << "RandomizedFailureMonitor thread shutting down"; + return; + } + } + + // Take a copy of the FD map under the lock. + FDMap fds_copy; + { + std::lock_guard<simple_spinlock> l(lock_); + fds_copy = fds_; + } + + MonoTime now = MonoTime::Now(); + for (const FDMap::value_type& entry : fds_copy) { + entry.second->CheckForFailures(now); + } + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/failure_detector.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/failure_detector.h b/be/src/kudu/util/failure_detector.h new file mode 100644 index 0000000..0290011 --- /dev/null +++ b/be/src/kudu/util/failure_detector.h @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_UTIL_FAILURE_DETECTOR_H_ +#define KUDU_UTIL_FAILURE_DETECTOR_H_ + +#include <string> +#include <unordered_map> + +#include "kudu/gutil/callback.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/monotime.h" +#include "kudu/util/locks.h" +#include "kudu/util/random.h" +#include "kudu/util/status_callback.h" + +namespace kudu { +class MonoDelta; +class MonoTime; +class Status; +class Thread; + +// A generic interface for failure detector implementations. +// A failure detector is responsible for deciding whether a certain server is dead or alive. +class FailureDetector : public RefCountedThreadSafe<FailureDetector> { + public: + enum NodeStatus { + DEAD, + ALIVE + }; + typedef std::unordered_map<std::string, NodeStatus> StatusMap; + + typedef Callback<void(const std::string& name, + const Status& status)> FailureDetectedCallback; + + virtual ~FailureDetector() {} + + // Registers a node with 'name' in the failure detector. + // + // If it returns Status::OK() the failure detector will from now + // expect messages from the machine with 'name' and will trigger + // 'callback' if a failure is detected. + // + // Returns Status::AlreadyPresent() if a machine with 'name' is + // already registered in this failure detector. + virtual Status Track(const std::string& name, + const MonoTime& now, + const FailureDetectedCallback& callback) = 0; + + // Stops tracking node with 'name'. + virtual Status UnTrack(const std::string& name) = 0; + + // Return true iff the named entity is currently being tracked. + virtual bool IsTracking(const std::string& name) = 0; + + // Records that a message from machine with 'name' was received at 'now'. + virtual Status MessageFrom(const std::string& name, const MonoTime& now) = 0; + + // Checks the failure status of each tracked node. If the failure criteria is + // met, the failure callback is invoked. + virtual void CheckForFailures(const MonoTime& now) = 0; +}; + +// A simple failure detector implementation that considers a node dead +// when they have not reported by a certain time interval. +class TimedFailureDetector : public FailureDetector { + public: + // Some monitorable entity. + struct Node { + std::string permanent_name; + MonoTime last_heard_of; + FailureDetectedCallback callback; + NodeStatus status; + }; + + explicit TimedFailureDetector(MonoDelta failure_period); + virtual ~TimedFailureDetector(); + + virtual Status Track(const std::string& name, + const MonoTime& now, + const FailureDetectedCallback& callback) OVERRIDE; + + virtual Status UnTrack(const std::string& name) OVERRIDE; + + virtual bool IsTracking(const std::string& name) OVERRIDE; + + virtual Status MessageFrom(const std::string& name, const MonoTime& now) OVERRIDE; + + virtual void CheckForFailures(const MonoTime& now) OVERRIDE; + + private: + typedef std::unordered_map<std::string, Node*> NodeMap; + + // Check if the named failure detector has failed. + // Does not invoke the callback. + FailureDetector::NodeStatus GetNodeStatusUnlocked(const std::string& name, + const MonoTime& now); + + const MonoDelta failure_period_; + mutable simple_spinlock lock_; + NodeMap nodes_; + + DISALLOW_COPY_AND_ASSIGN(TimedFailureDetector); +}; + +// A randomized failure monitor that wakes up in normally-distributed intervals +// and runs CheckForFailures() on each failure detector it monitors. +// +// The wake up interval is defined by a normal distribution with the specified +// mean and standard deviation, in milliseconds, with minimum possible value +// pinned at kMinWakeUpTimeMillis. +// +// We use a random wake up interval to avoid thundering herd / lockstep problems +// when multiple nodes react to the failure of another node. +class RandomizedFailureMonitor { + public: + // The minimum time the FailureMonitor will wait. + static const int64_t kMinWakeUpTimeMillis; + + RandomizedFailureMonitor(uint32_t random_seed, + int64_t period_mean_millis, + int64_t period_std_dev_millis); + ~RandomizedFailureMonitor(); + + // Starts the failure monitor. + Status Start(); + + // Stops the failure monitor. + void Shutdown(); + + // Adds a failure detector to be monitored. + Status MonitorFailureDetector(const std::string& name, + const scoped_refptr<FailureDetector>& fd); + + // Unmonitors the failure detector with the specified name. + Status UnmonitorFailureDetector(const std::string& name); + + private: + typedef std::unordered_map<std::string, scoped_refptr<FailureDetector> > FDMap; + + // Runs the monitor thread. + void RunThread(); + + // Mean & std. deviation of random period to sleep for between checking the + // failure detectors. + const int64_t period_mean_millis_; + const int64_t period_stddev_millis_; + ThreadSafeRandom random_; + + scoped_refptr<Thread> thread_; + CountDownLatch run_latch_; + + mutable simple_spinlock lock_; + FDMap fds_; + bool shutdown_; // Whether the failure monitor should shut down. + + DISALLOW_COPY_AND_ASSIGN(RandomizedFailureMonitor); +}; + +} // namespace kudu + +#endif /* KUDU_UTIL_FAILURE_DETECTOR_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/faststring-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/faststring-test.cc b/be/src/kudu/util/faststring-test.cc new file mode 100644 index 0000000..c57cb41 --- /dev/null +++ b/be/src/kudu/util/faststring-test.cc @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <algorithm> +#include "kudu/util/faststring.h" +#include "kudu/util/random.h" +#include "kudu/util/random_util.h" +#include "kudu/util/test_util.h" + +namespace kudu { +class FaststringTest : public KuduTest {}; + +TEST_F(FaststringTest, TestShrinkToFit_Empty) { + faststring s; + s.shrink_to_fit(); + ASSERT_EQ(faststring::kInitialCapacity, s.capacity()); +} + +// Test that, if the string contents is shorter than the initial capacity +// of the faststring, shrink_to_fit() leaves the string in the built-in +// array. +TEST_F(FaststringTest, TestShrinkToFit_SmallerThanInitialCapacity) { + faststring s; + s.append("hello"); + s.shrink_to_fit(); + ASSERT_EQ(faststring::kInitialCapacity, s.capacity()); +} + +TEST_F(FaststringTest, TestShrinkToFit_Random) { + Random r(GetRandomSeed32()); + int kMaxSize = faststring::kInitialCapacity * 2; + std::unique_ptr<char[]> random_bytes(new char[kMaxSize]); + RandomString(random_bytes.get(), kMaxSize, &r); + + faststring s; + for (int i = 0; i < 100; i++) { + int new_size = r.Uniform(kMaxSize); + s.resize(new_size); + memcpy(s.data(), random_bytes.get(), new_size); + s.shrink_to_fit(); + ASSERT_EQ(0, memcmp(s.data(), random_bytes.get(), new_size)); + ASSERT_EQ(std::max<int>(faststring::kInitialCapacity, new_size), s.capacity()); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/faststring.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/faststring.cc b/be/src/kudu/util/faststring.cc new file mode 100644 index 0000000..a1cd26b --- /dev/null +++ b/be/src/kudu/util/faststring.cc @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/faststring.h" + +#include <glog/logging.h> +#include <memory> + +namespace kudu { + +void faststring::GrowByAtLeast(size_t count) { + // Not enough space, need to reserve more. + // Don't reserve exactly enough space for the new string -- that makes it + // too easy to write perf bugs where you get O(n^2) append. + // Instead, alwayhs expand by at least 50%. + + size_t to_reserve = len_ + count; + if (len_ + count < len_ * 3 / 2) { + to_reserve = len_ * 3 / 2; + } + GrowArray(to_reserve); +} + +void faststring::GrowArray(size_t newcapacity) { + DCHECK_GE(newcapacity, capacity_); + std::unique_ptr<uint8_t[]> newdata(new uint8_t[newcapacity]); + if (len_ > 0) { + memcpy(&newdata[0], &data_[0], len_); + } + capacity_ = newcapacity; + if (data_ != initial_data_) { + delete[] data_; + } else { + ASAN_POISON_MEMORY_REGION(initial_data_, arraysize(initial_data_)); + } + + data_ = newdata.release(); + ASAN_POISON_MEMORY_REGION(data_ + len_, capacity_ - len_); +} + +void faststring::ShrinkToFitInternal() { + DCHECK_NE(data_, initial_data_); + if (len_ <= kInitialCapacity) { + ASAN_UNPOISON_MEMORY_REGION(initial_data_, len_); + memcpy(initial_data_, &data_[0], len_); + delete[] data_; + data_ = initial_data_; + capacity_ = kInitialCapacity; + } else { + std::unique_ptr<uint8_t[]> newdata(new uint8_t[len_]); + memcpy(&newdata[0], &data_[0], len_); + delete[] data_; + data_ = newdata.release(); + capacity_ = len_; + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/faststring.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/faststring.h b/be/src/kudu/util/faststring.h new file mode 100644 index 0000000..3d25c84 --- /dev/null +++ b/be/src/kudu/util/faststring.h @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_FASTSTRING_H +#define KUDU_UTIL_FASTSTRING_H + +#include <string> + +#include "kudu/gutil/dynamic_annotations.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/strings/fastmem.h" + +namespace kudu { + +// A faststring is similar to a std::string, except that it is faster for many +// common use cases (in particular, resize() will fill with uninitialized data +// instead of memsetting to \0) +class faststring { + public: + enum { + kInitialCapacity = 32 + }; + + faststring() : + data_(initial_data_), + len_(0), + capacity_(kInitialCapacity) { + } + + // Construct a string with the given capacity, in bytes. + explicit faststring(size_t capacity) + : data_(initial_data_), + len_(0), + capacity_(kInitialCapacity) { + if (capacity > capacity_) { + data_ = new uint8_t[capacity]; + capacity_ = capacity; + } + ASAN_POISON_MEMORY_REGION(data_, capacity_); + } + + ~faststring() { + ASAN_UNPOISON_MEMORY_REGION(initial_data_, arraysize(initial_data_)); + if (data_ != initial_data_) { + delete[] data_; + } + } + + // Reset the valid length of the string to 0. + // + // This does not free up any memory. The capacity of the string remains unchanged. + void clear() { + resize(0); + ASAN_POISON_MEMORY_REGION(data_, capacity_); + } + + // Resize the string to the given length. + // If the new length is larger than the old length, the capacity is expanded as necessary. + // + // NOTE: in contrast to std::string's implementation, Any newly "exposed" bytes of data are + // not cleared. + void resize(size_t newsize) { + if (newsize > capacity_) { + reserve(newsize); + } + len_ = newsize; + ASAN_POISON_MEMORY_REGION(data_ + len_, capacity_ - len_); + ASAN_UNPOISON_MEMORY_REGION(data_, len_); + } + + // Releases the underlying array; after this, the buffer is left empty. + // + // NOTE: the data pointer returned by release() is not necessarily the pointer + uint8_t *release() WARN_UNUSED_RESULT { + uint8_t *ret = data_; + if (ret == initial_data_) { + ret = new uint8_t[len_]; + memcpy(ret, data_, len_); + } + len_ = 0; + capacity_ = kInitialCapacity; + data_ = initial_data_; + ASAN_POISON_MEMORY_REGION(data_, capacity_); + return ret; + } + + // Reserve space for the given total amount of data. If the current capacity is already + // larger than the newly requested capacity, this is a no-op (i.e. it does not ever free memory). + // + // NOTE: even though the new capacity is reserved, it is illegal to begin writing into that memory + // directly using pointers. If ASAN is enabled, this is ensured using manual memory poisoning. + void reserve(size_t newcapacity) { + if (PREDICT_TRUE(newcapacity <= capacity_)) return; + GrowArray(newcapacity); + } + + // Append the given data to the string, resizing capacity as necessary. + void append(const void *src_v, size_t count) { + const uint8_t *src = reinterpret_cast<const uint8_t *>(src_v); + EnsureRoomForAppend(count); + ASAN_UNPOISON_MEMORY_REGION(data_ + len_, count); + + // appending short values is common enough that this + // actually helps, according to benchmarks. In theory + // memcpy_inlined should already be just as good, but this + // was ~20% faster for reading a large prefix-coded string file + // where each string was only a few chars different + if (count <= 4) { + uint8_t *p = &data_[len_]; + for (int i = 0; i < count; i++) { + *p++ = *src++; + } + } else { + strings::memcpy_inlined(&data_[len_], src, count); + } + len_ += count; + } + + // Append the given string to this string. + void append(const std::string &str) { + append(str.data(), str.size()); + } + + // Append the given character to this string. + void push_back(const char byte) { + EnsureRoomForAppend(1); + ASAN_UNPOISON_MEMORY_REGION(data_ + len_, 1); + data_[len_] = byte; + len_++; + } + + // Return the valid length of this string. + size_t length() const { + return len_; + } + + // Return the valid length of this string (identical to length()) + size_t size() const { + return len_; + } + + // Return the allocated capacity of this string. + size_t capacity() const { + return capacity_; + } + + // Return a pointer to the data in this string. Note that this pointer + // may be invalidated by any later non-const operation. + const uint8_t *data() const { + return &data_[0]; + } + + // Return a pointer to the data in this string. Note that this pointer + // may be invalidated by any later non-const operation. + uint8_t *data() { + return &data_[0]; + } + + // Return the given element of this string. Note that this does not perform + // any bounds checking. + const uint8_t &at(size_t i) const { + return data_[i]; + } + + // Return the given element of this string. Note that this does not perform + // any bounds checking. + const uint8_t &operator[](size_t i) const { + return data_[i]; + } + + // Return the given element of this string. Note that this does not perform + // any bounds checking. + uint8_t &operator[](size_t i) { + return data_[i]; + } + + // Reset the contents of this string by copying 'len' bytes from 'src'. + void assign_copy(const uint8_t *src, size_t len) { + // Reset length so that the first resize doesn't need to copy the current + // contents of the array. + len_ = 0; + resize(len); + memcpy(data(), src, len); + } + + // Reset the contents of this string by copying from the given std::string. + void assign_copy(const std::string &str) { + assign_copy(reinterpret_cast<const uint8_t *>(str.c_str()), + str.size()); + } + + // Reallocates the internal storage to fit only the current data. + // + // This may revert to using internal storage if the current length is shorter than + // kInitialCapacity. Note that, in that case, after this call, capacity() will return + // a capacity larger than the data length. + // + // Any pointers within this instance are invalidated. + void shrink_to_fit() { + if (data_ == initial_data_ || capacity_ == len_) return; + ShrinkToFitInternal(); + } + + // Return a copy of this string as a std::string. + std::string ToString() const { + return std::string(reinterpret_cast<const char *>(data()), + len_); + } + + private: + DISALLOW_COPY_AND_ASSIGN(faststring); + + // If necessary, expand the buffer to fit at least 'count' more bytes. + // If the array has to be grown, it is grown by at least 50%. + void EnsureRoomForAppend(size_t count) { + if (PREDICT_TRUE(len_ + count <= capacity_)) { + return; + } + + // Call the non-inline slow path - this reduces the number of instructions + // on the hot path. + GrowByAtLeast(count); + } + + // The slow path of MakeRoomFor. Grows the buffer by either + // 'count' bytes, or 50%, whichever is more. + void GrowByAtLeast(size_t count); + + // Grow the array to the given capacity, which must be more than + // the current capacity. + void GrowArray(size_t newcapacity); + + void ShrinkToFitInternal(); + + uint8_t* data_; + uint8_t initial_data_[kInitialCapacity]; + size_t len_; + size_t capacity_; +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/fault_injection.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/fault_injection.cc b/be/src/kudu/util/fault_injection.cc new file mode 100644 index 0000000..a14c8f3 --- /dev/null +++ b/be/src/kudu/util/fault_injection.cc @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/fault_injection.h" + +#include <stdlib.h> +#include <sys/time.h> + +#include "kudu/gutil/once.h" +#include "kudu/util/debug/leakcheck_disabler.h" +#include "kudu/util/monotime.h" +#include "kudu/util/os-util.h" +#include "kudu/util/random.h" +#include "kudu/util/random_util.h" + +namespace kudu { +namespace fault_injection { + +namespace { +GoogleOnceType g_random_once; +Random* g_random; + +void InitRandom() { + LOG(WARNING) << "FAULT INJECTION ENABLED!"; + LOG(WARNING) << "THIS SERVER MAY CRASH!"; + + debug::ScopedLeakCheckDisabler d; + g_random = new Random(GetRandomSeed32()); + ANNOTATE_BENIGN_RACE_SIZED(g_random, sizeof(Random), + "Racy random numbers are OK"); +} + +} // anonymous namespace + +void DoMaybeFault(const char* fault_str, double fraction) { + GoogleOnceInit(&g_random_once, InitRandom); + if (PREDICT_TRUE(g_random->NextDoubleFraction() >= fraction)) { + return; + } + LOG(ERROR) << "Injecting fault: " << fault_str << " (process will exit)"; + // _exit will exit the program without running atexit handlers. This more + // accurately simiulates a crash. + _exit(kExitStatus); +} + +void DoInjectRandomLatency(double max_latency_ms) { + GoogleOnceInit(&g_random_once, InitRandom); + SleepFor(MonoDelta::FromMilliseconds(g_random->NextDoubleFraction() * max_latency_ms)); +} + +Status DoMaybeReturnFailure(double fraction, + const Status& bad_status_to_return) { + GoogleOnceInit(&g_random_once, InitRandom); + if (PREDICT_TRUE(g_random->NextDoubleFraction() >= fraction)) { + return Status::OK(); + } + return bad_status_to_return; +} + +} // namespace fault_injection +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/fault_injection.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/fault_injection.h b/be/src/kudu/util/fault_injection.h new file mode 100644 index 0000000..cc22bab --- /dev/null +++ b/be/src/kudu/util/fault_injection.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_FAULT_INJECTION_H +#define KUDU_UTIL_FAULT_INJECTION_H + +#include "kudu/gutil/macros.h" +#include "kudu/util/status.h" + +// Macros for injecting various kinds of faults with varying probability. If +// configured with 0 probability, each of these macros is evaluated inline and +// is fast enough to run even in hot code paths. + +// With some probability, crash at the current point in the code +// by issuing LOG(FATAL). +// +// The probability is determined by the 'fraction_flag' argument. +// +// Typical usage: +// +// DEFINE_double(fault_crash_before_foo, 0.0, +// "Fraction of the time when we will crash before doing foo"); +// TAG_FLAG(fault_crash_before_foo, unsafe); +#define MAYBE_FAULT(fraction_flag) \ + kudu::fault_injection::MaybeFault(AS_STRING(fraction_flag), fraction_flag) + +// Inject a uniformly random amount of latency between 0 and the configured +// number of milliseconds. +#define MAYBE_INJECT_RANDOM_LATENCY(max_ms_flag) \ + kudu::fault_injection::MaybeInjectRandomLatency(max_ms_flag) + +// With some probability, return the failure described by 'status_expr'. +// +// Unlike the other MAYBE_ macros, this one does not chain to an inline +// function so that 'status_expr' isn't evaluated unless 'fraction_flag' +// really is non-zero. +#define MAYBE_RETURN_FAILURE(fraction_flag, status_expr) \ + static const Status status_eval = (status_expr); \ + RETURN_NOT_OK(kudu::fault_injection::MaybeReturnFailure(fraction_flag, status_eval)); + +// Implementation details below. +// Use the MAYBE_FAULT macro instead. +namespace kudu { +namespace fault_injection { + +// The exit status returned from a process exiting due to a fault. +// The choice of value here is arbitrary: just needs to be something +// wouldn't normally be returned by a non-fault-injection code path. +constexpr int kExitStatus = 85; + +// Out-of-line implementation. +void DoMaybeFault(const char* fault_str, double fraction); +void DoInjectRandomLatency(double max_latency_ms); +Status DoMaybeReturnFailure(double fraction, + const Status& bad_status_to_return); + +inline void MaybeFault(const char* fault_str, double fraction) { + if (PREDICT_TRUE(fraction <= 0)) return; + DoMaybeFault(fault_str, fraction); +} + +inline void MaybeInjectRandomLatency(double max_latency) { + if (PREDICT_TRUE(max_latency <= 0)) return; + DoInjectRandomLatency(max_latency); +} + +inline Status MaybeReturnFailure(double fraction, + const Status& bad_status_to_return) { + if (PREDICT_TRUE(fraction <= 0)) return Status::OK(); + return DoMaybeReturnFailure(fraction, bad_status_to_return); +} + +} // namespace fault_injection +} // namespace kudu +#endif /* KUDU_UTIL_FAULT_INJECTION_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/file_cache-stress-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/file_cache-stress-test.cc b/be/src/kudu/util/file_cache-stress-test.cc new file mode 100644 index 0000000..966f202 --- /dev/null +++ b/be/src/kudu/util/file_cache-stress-test.cc @@ -0,0 +1,390 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/file_cache.h" + +#include <deque> +#include <iterator> +#include <memory> +#include <mutex> +#include <string> +#include <thread> +#include <unordered_map> +#include <vector> + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/env.h" +#include "kudu/util/file_cache-test-util.h" +#include "kudu/util/locks.h" +#include "kudu/util/logging.h" +#include "kudu/util/metrics.h" +#include "kudu/util/monotime.h" +#include "kudu/util/oid_generator.h" +#include "kudu/util/path_util.h" +#include "kudu/util/random.h" +#include "kudu/util/status.h" +#include "kudu/util/test_util.h" + +// Like CHECK_OK(), but dumps the contents of the cache before failing. +// +// The output of ToDebugString() tends to be long enough that LOG() truncates +// it, so we must split it ourselves before logging. +#define TEST_CHECK_OK(to_call) do { \ + const Status& _s = (to_call); \ + if (!_s.ok()) { \ + LOG(INFO) << "Dumping cache contents"; \ + vector<string> lines = strings::Split(cache_.ToDebugString(), "\n", \ + strings::SkipEmpty()); \ + for (const auto& l : lines) { \ + LOG(INFO) << l; \ + } \ + } \ + CHECK(_s.ok()) << "Bad status: " << _s.ToString(); \ + } while (0); + +// This default value is friendly to many n-CPU configurations. +DEFINE_int32(test_max_open_files, 192, "Maximum number of open files enforced " + "by the cache. Should be a multiple of the number of CPUs on the " + "system."); + +DEFINE_int32(test_num_producer_threads, 1, "Number of producer threads"); +DEFINE_int32(test_num_consumer_threads, 4, "Number of consumer threads"); +DEFINE_int32(test_duration_secs, 2, "Number of seconds to run the test"); + +using std::deque; +using std::shared_ptr; +using std::thread; +using std::unique_ptr; +using std::unordered_map; +using std::vector; +using strings::Substitute; + +namespace kudu { + +template <class FileType> +class FileCacheStressTest : public KuduTest { + public: + typedef unordered_map<string, unordered_map<string, int>> MetricMap; + + FileCacheStressTest() + : cache_("test", + env_, + FLAGS_test_max_open_files, + scoped_refptr<MetricEntity>()), + rand_(SeedRandom()), + running_(1) { + } + + void SetUp() override { + ASSERT_OK(cache_.Init()); + } + + void ProducerThread() { + Random rand(rand_.Next32()); + ObjectIdGenerator oid_generator; + MetricMap metrics; + + do { + // Create a new file with some (0-32k) random data in it. + string next_file_name = GetTestPath(oid_generator.Next()); + { + unique_ptr<WritableFile> next_file; + CHECK_OK(env_->NewWritableFile(next_file_name, &next_file)); + uint8_t buf[rand.Uniform((32 * 1024) - 1) + 1]; + CHECK_OK(next_file->Append(GenerateRandomChunk(buf, sizeof(buf), &rand))); + CHECK_OK(next_file->Close()); + } + { + std::lock_guard<simple_spinlock> l(lock_); + InsertOrDie(&available_files_, next_file_name, 0); + } + metrics[BaseName(next_file_name)]["create"] = 1; + } while (!running_.WaitFor(MonoDelta::FromMilliseconds(1))); + + // Update the global metrics map. + MergeNewMetrics(std::move(metrics)); + } + + void ConsumerThread() { + // Each thread has its own PRNG to minimize contention on the main one. + Random rand(rand_.Next32()); + + // Active opened files in this thread. + deque<shared_ptr<FileType>> files; + + // Metrics generated by this thread. They will be merged into the main + // metrics map when the thread is done. + MetricMap metrics; + + do { + // Pick an action to perform. Distribution: + // 20% open + // 15% close + // 35% read + // 20% write + // 10% delete + int next_action = rand.Uniform(100); + + if (next_action < 20) { + // Open an existing file. + string to_open; + if (!GetRandomFile(OPEN, &rand, &to_open)) { + continue; + } + shared_ptr<FileType> new_file; + TEST_CHECK_OK(cache_.OpenExistingFile(to_open, &new_file)); + FinishedOpen(to_open); + metrics[BaseName(to_open)]["open"]++; + files.emplace_back(new_file); + } else if (next_action < 35) { + // Close a file. + if (files.empty()) { + continue; + } + shared_ptr<FileType> file = files.front(); + files.pop_front(); + metrics[BaseName(file->filename())]["close"]++; + } else if (next_action < 70) { + // Read a random chunk from a file. + TEST_CHECK_OK(ReadRandomChunk(files, &metrics, &rand)); + } else if (next_action < 90) { + // Write a random chunk to a file. + TEST_CHECK_OK(WriteRandomChunk(files, &metrics, &rand)); + } else if (next_action < 100) { + // Delete a file. + string to_delete; + if (!GetRandomFile(DELETE, &rand, &to_delete)) { + continue; + } + TEST_CHECK_OK(cache_.DeleteFile(to_delete)); + metrics[BaseName(to_delete)]["delete"]++; + } + } while (!running_.WaitFor(MonoDelta::FromMilliseconds(1))); + + // Update the global metrics map. + MergeNewMetrics(std::move(metrics)); + } + + protected: + void NotifyThreads() { running_.CountDown(); } + + const MetricMap& metrics() const { return metrics_; } + + private: + enum GetMode { + OPEN, + DELETE + }; + + // Retrieve a random file name to be either opened or deleted. If deleting, + // the file name is made inaccessible to future operations. + bool GetRandomFile(GetMode mode, Random* rand, string* out) { + std::lock_guard<simple_spinlock> l(lock_); + if (available_files_.empty()) { + return false; + } + + // This is linear time, but it's simpler than managing multiple data + // structures. + auto it = available_files_.begin(); + std::advance(it, rand->Uniform(available_files_.size())); + + // It's unsafe to delete a file that is still being opened. + if (mode == DELETE && it->second > 0) { + return false; + } + + *out = it->first; + if (mode == OPEN) { + it->second++; + } else { + available_files_.erase(it); + } + return true; + } + + // Signal that a previously in-progress open has finished, allowing the file + // in question to be deleted. + void FinishedOpen(const string& opened) { + std::lock_guard<simple_spinlock> l(lock_); + int& openers = FindOrDie(available_files_, opened); + openers--; + } + + // Reads a random chunk of data from a random file in 'files'. On success, + // writes to 'metrics'. + static Status ReadRandomChunk(const deque<shared_ptr<FileType>>& files, + MetricMap* metrics, + Random* rand) { + if (files.empty()) { + return Status::OK(); + } + const shared_ptr<FileType>& file = files[rand->Uniform(files.size())]; + + uint64_t file_size; + RETURN_NOT_OK(file->Size(&file_size)); + uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0; + size_t len = file_size > 0 ? rand->Uniform(file_size - off) : 0; + unique_ptr<uint8_t[]> scratch(new uint8_t[len]); + Slice s(scratch.get(), len); + RETURN_NOT_OK(file->Read(off, &s)); + + (*metrics)[BaseName(file->filename())]["read"]++; + return Status::OK(); + } + + // Writes a random chunk of data to a random file in 'files'. On success, + // updates 'metrics'. + // + // No-op for file implementations that don't support writing. + static Status WriteRandomChunk(const deque<shared_ptr<FileType>>& files, + MetricMap* metrics, + Random* rand); + + static Slice GenerateRandomChunk(uint8_t* buffer, size_t max_length, Random* rand) { + size_t len = rand->Uniform(max_length); + len -= len % sizeof(uint32_t); + for (int i = 0; i < (len / sizeof(uint32_t)); i += sizeof(uint32_t)) { + reinterpret_cast<uint32_t*>(buffer)[i] = rand->Next32(); + } + return Slice(buffer, len); + } + + // Merge the metrics in 'new_metrics' into the global metric map. + void MergeNewMetrics(MetricMap new_metrics) { + std::lock_guard<simple_spinlock> l(lock_); + for (const auto& file_action_pair : new_metrics) { + for (const auto& action_count_pair : file_action_pair.second) { + metrics_[file_action_pair.first][action_count_pair.first] += action_count_pair.second; + } + } + } + + FileCache<FileType> cache_; + + // Used to seed per-thread PRNGs. + ThreadSafeRandom rand_; + + // Drops to zero when the test ends. + CountDownLatch running_; + + // Protects 'available_files_' and 'metrics_'. + simple_spinlock lock_; + + // Contains files produced by producer threads and ready for consumption by + // consumer threads. + // + // Each entry is a file name and the number of in-progress openers. To delete + // a file, there must be no openers. + unordered_map<string, int> available_files_; + + // For each file name, tracks the count of consumer actions performed. + // + // Only updated at test end. + MetricMap metrics_; +}; + +template <> +Status FileCacheStressTest<RWFile>::WriteRandomChunk( + const deque<shared_ptr<RWFile>>& files, + MetricMap* metrics, + Random* rand) { + if (files.empty()) { + return Status::OK(); + } + const shared_ptr<RWFile>& file = files[rand->Uniform(files.size())]; + + uint64_t file_size; + RETURN_NOT_OK(file->Size(&file_size)); + uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0; + uint8 buf[64]; + RETURN_NOT_OK(file->Write(off, GenerateRandomChunk(buf, sizeof(buf), rand))); + (*metrics)[BaseName(file->filename())]["write"]++; + return Status::OK(); +} + +template <> +Status FileCacheStressTest<RandomAccessFile>::WriteRandomChunk( + const deque<shared_ptr<RandomAccessFile>>& /* unused */, + MetricMap* /* unused */, + Random* /* unused */) { + return Status::OK(); +} + +typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes; +TYPED_TEST_CASE(FileCacheStressTest, FileTypes); + +TYPED_TEST(FileCacheStressTest, TestStress) { + OverrideFlagForSlowTests("test_num_producer_threads", "2"); + OverrideFlagForSlowTests("test_num_consumer_threads", "8"); + OverrideFlagForSlowTests("test_duration_secs", "30"); + + // Start the threads. + PeriodicOpenFdChecker checker( + this->env_, + FLAGS_test_max_open_files + // cache capacity + FLAGS_test_num_producer_threads + // files being written + FLAGS_test_num_consumer_threads); // files being opened + checker.Start(); + vector<thread> producers; + for (int i = 0; i < FLAGS_test_num_producer_threads; i++) { + producers.emplace_back(&FileCacheStressTest<TypeParam>::ProducerThread, this); + } + vector<thread> consumers; + for (int i = 0; i < FLAGS_test_num_consumer_threads; i++) { + consumers.emplace_back(&FileCacheStressTest<TypeParam>::ConsumerThread, this); + } + + // Let the test run. + SleepFor(MonoDelta::FromSeconds(FLAGS_test_duration_secs)); + + // Stop the threads. + this->NotifyThreads(); + checker.Stop(); + for (auto& p : producers) { + p.join(); + } + for (auto& c : consumers) { + c.join(); + } + + // Log the metrics. + unordered_map<string, int> action_counts; + for (const auto& file_action_pair : this->metrics()) { + for (const auto& action_count_pair : file_action_pair.second) { + VLOG(2) << Substitute("$0: $1: $2", + file_action_pair.first, + action_count_pair.first, + action_count_pair.second); + action_counts[action_count_pair.first] += action_count_pair.second; + } + } + for (const auto& action_count_pair : action_counts) { + LOG(INFO) << Substitute("$0: $1", + action_count_pair.first, + action_count_pair.second); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/file_cache-test-util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/file_cache-test-util.h b/be/src/kudu/util/file_cache-test-util.h new file mode 100644 index 0000000..8b0bcce --- /dev/null +++ b/be/src/kudu/util/file_cache-test-util.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <thread> + +#include <glog/logging.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/env.h" +#include "kudu/util/logging.h" +#include "kudu/util/monotime.h" +#include "kudu/util/test_util.h" + +namespace kudu { + +// Periodically checks the number of open file descriptors belonging to this +// process, crashing if it exceeds some upper bound. +class PeriodicOpenFdChecker { + public: + PeriodicOpenFdChecker(Env* env, int upper_bound) + : env_(env), + initial_fd_count_(CountOpenFds(env)), + max_fd_count_(upper_bound + initial_fd_count_), + running_(1), + started_(false) {} + + ~PeriodicOpenFdChecker() { Stop(); } + + void Start() { + DCHECK(!started_); + running_.Reset(1); + check_thread_ = std::thread(&PeriodicOpenFdChecker::CheckThread, this); + started_ = true; + } + + void Stop() { + if (started_) { + running_.CountDown(); + check_thread_.join(); + started_ = false; + } + } + + private: + void CheckThread() { + LOG(INFO) << strings::Substitute("Periodic open fd checker starting " + "(initial: $0 max: $1)", + initial_fd_count_, max_fd_count_); + do { + int open_fd_count = CountOpenFds(env_); + KLOG_EVERY_N_SECS(INFO, 1) << strings::Substitute("Open fd count: $0/$1", + open_fd_count, + max_fd_count_); + CHECK_LE(open_fd_count, max_fd_count_); + } while (!running_.WaitFor(MonoDelta::FromMilliseconds(100))); + } + + Env* env_; + const int initial_fd_count_; + const int max_fd_count_; + + CountDownLatch running_; + std::thread check_thread_; + bool started_; +}; + +} // namespace kudu
