http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/env_util.cc b/be/src/kudu/util/env_util.cc
new file mode 100644
index 0000000..e9117a4
--- /dev/null
+++ b/be/src/kudu/util/env_util.cc
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/env_util.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/status.h"
+
+DEFINE_int64(disk_reserved_bytes_free_for_testing, -1,
+             "For testing only! Set to number of bytes free on each 
filesystem. "
+             "Set to -1 to disable this test-specific override");
+TAG_FLAG(disk_reserved_bytes_free_for_testing, runtime);
+TAG_FLAG(disk_reserved_bytes_free_for_testing, unsafe);
+
+// We define some flags for testing purposes: Two prefixes and their associated
+// "bytes free" overrides.
+DEFINE_string(disk_reserved_override_prefix_1_path_for_testing, "",
+              "For testing only! Specifies a prefix to override the visible 
'bytes free' on. "
+              "Use --disk_reserved_override_prefix_1_bytes_free_for_testing to 
set the number of "
+              "bytes free for this path prefix. Set to empty string to 
disable.");
+DEFINE_int64(disk_reserved_override_prefix_1_bytes_free_for_testing, -1,
+             "For testing only! Set number of bytes free on the path prefix 
specified by "
+             "--disk_reserved_override_prefix_1_path_for_testing. Set to -1 to 
disable.");
+DEFINE_string(disk_reserved_override_prefix_2_path_for_testing, "",
+              "For testing only! Specifies a prefix to override the visible 
'bytes free' on. "
+              "Use --disk_reserved_override_prefix_2_bytes_free_for_testing to 
set the number of "
+              "bytes free for this path prefix. Set to empty string to 
disable.");
+DEFINE_int64(disk_reserved_override_prefix_2_bytes_free_for_testing, -1,
+             "For testing only! Set number of bytes free on the path prefix 
specified by "
+             "--disk_reserved_override_prefix_2_path_for_testing. Set to -1 to 
disable.");
+TAG_FLAG(disk_reserved_override_prefix_1_path_for_testing, unsafe);
+TAG_FLAG(disk_reserved_override_prefix_2_path_for_testing, unsafe);
+TAG_FLAG(disk_reserved_override_prefix_1_bytes_free_for_testing, unsafe);
+TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, unsafe);
+TAG_FLAG(disk_reserved_override_prefix_1_bytes_free_for_testing, runtime);
+TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, runtime);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace env_util {
+
+Status OpenFileForWrite(Env* env, const string& path,
+                        shared_ptr<WritableFile>* file) {
+  return OpenFileForWrite(WritableFileOptions(), env, path, file);
+}
+
+Status OpenFileForWrite(const WritableFileOptions& opts,
+                        Env *env, const string &path,
+                        shared_ptr<WritableFile> *file) {
+  unique_ptr<WritableFile> w;
+  RETURN_NOT_OK(env->NewWritableFile(opts, path, &w));
+  file->reset(w.release());
+  return Status::OK();
+}
+
+Status OpenFileForRandom(Env *env, const string &path,
+                         shared_ptr<RandomAccessFile> *file) {
+  unique_ptr<RandomAccessFile> r;
+  RETURN_NOT_OK(env->NewRandomAccessFile(path, &r));
+  file->reset(r.release());
+  return Status::OK();
+}
+
+Status OpenFileForSequential(Env *env, const string &path,
+                             shared_ptr<SequentialFile> *file) {
+  unique_ptr<SequentialFile> r;
+  RETURN_NOT_OK(env->NewSequentialFile(path, &r));
+  file->reset(r.release());
+  return Status::OK();
+}
+
+// If any of the override gflags specifies an override for the given path, then
+// override the free bytes to match what is specified in the flag. See the
+// definitions of these test-only flags for more information.
+static void OverrideBytesFreeWithTestingFlags(const string& path, int64_t* 
bytes_free) {
+  const string* prefixes[] = { 
&FLAGS_disk_reserved_override_prefix_1_path_for_testing,
+                               
&FLAGS_disk_reserved_override_prefix_2_path_for_testing };
+  const int64_t* overrides[] = { 
&FLAGS_disk_reserved_override_prefix_1_bytes_free_for_testing,
+                                 
&FLAGS_disk_reserved_override_prefix_2_bytes_free_for_testing };
+  for (int i = 0; i < arraysize(prefixes); i++) {
+    if (*overrides[i] != -1 && !prefixes[i]->empty() && HasPrefixString(path, 
*prefixes[i])) {
+      *bytes_free = *overrides[i];
+      return;
+    }
+  }
+}
+
+Status VerifySufficientDiskSpace(Env *env, const std::string& path,
+                                 int64_t requested_bytes, int64_t 
reserved_bytes) {
+  const int64_t kOnePercentReservation = -1;
+  DCHECK_GE(requested_bytes, 0);
+
+  SpaceInfo space_info;
+  RETURN_NOT_OK(env->GetSpaceInfo(path, &space_info));
+  int64_t available_bytes = space_info.free_bytes;
+
+  // Allow overriding these values by tests.
+  if (PREDICT_FALSE(FLAGS_disk_reserved_bytes_free_for_testing > -1)) {
+    available_bytes = FLAGS_disk_reserved_bytes_free_for_testing;
+  }
+  if 
(PREDICT_FALSE(FLAGS_disk_reserved_override_prefix_1_bytes_free_for_testing != 
-1 ||
+                    
FLAGS_disk_reserved_override_prefix_2_bytes_free_for_testing != -1)) {
+    OverrideBytesFreeWithTestingFlags(path, &available_bytes);
+  }
+
+  // If they requested a one percent reservation, calculate what that is in 
bytes.
+  if (reserved_bytes == kOnePercentReservation) {
+    reserved_bytes = space_info.capacity_bytes / 100;
+  }
+
+  if (available_bytes - requested_bytes < reserved_bytes) {
+    return Status::IOError(Substitute("Insufficient disk space to allocate $0 
bytes under path $1 "
+                                      "($2 bytes available vs $3 bytes 
reserved)",
+                                      requested_bytes, path, available_bytes, 
reserved_bytes),
+                           "", ENOSPC);
+  }
+  return Status::OK();
+}
+
+Status CreateDirIfMissing(Env* env, const string& path, bool* created) {
+  Status s = env->CreateDir(path);
+  if (created != nullptr) {
+    *created = s.ok();
+  }
+  return s.IsAlreadyPresent() ? Status::OK() : s;
+}
+
+Status CreateDirsRecursively(Env* env, const string& path) {
+  vector<string> segments = SplitPath(path);
+  string partial_path;
+  for (const string& segment : segments) {
+    partial_path = partial_path.empty() ? segment : 
JoinPathSegments(partial_path, segment);
+    bool is_dir;
+    Status s = env->IsDirectory(partial_path, &is_dir);
+    if (s.ok()) {
+      // We didn't get a NotFound error, so something is there.
+      if (is_dir) continue; // It's a normal directory.
+      // Maybe a file or a symlink. Let's try to follow the symlink.
+      string real_partial_path;
+      RETURN_NOT_OK(env->Canonicalize(partial_path, &real_partial_path));
+      s = env->IsDirectory(real_partial_path, &is_dir);
+      if (s.ok() && is_dir) continue; // It's a symlink to a directory.
+    }
+    RETURN_NOT_OK_PREPEND(env->CreateDir(partial_path), "Unable to create 
directory");
+  }
+  return Status::OK();
+}
+
+Status CopyFile(Env* env, const string& source_path, const string& dest_path,
+                WritableFileOptions opts) {
+  unique_ptr<SequentialFile> source;
+  RETURN_NOT_OK(env->NewSequentialFile(source_path, &source));
+  uint64_t size;
+  RETURN_NOT_OK(env->GetFileSize(source_path, &size));
+
+  unique_ptr<WritableFile> dest;
+  RETURN_NOT_OK(env->NewWritableFile(opts, dest_path, &dest));
+  RETURN_NOT_OK(dest->PreAllocate(size));
+
+  const int32_t kBufferSize = 1024 * 1024;
+  unique_ptr<uint8_t[]> scratch(new uint8_t[kBufferSize]);
+
+  uint64_t bytes_read = 0;
+  while (bytes_read < size) {
+    uint64_t max_bytes_to_read = std::min<uint64_t>(size - bytes_read, 
kBufferSize);
+    Slice data(scratch.get(), max_bytes_to_read);
+    RETURN_NOT_OK(source->Read(&data));
+    RETURN_NOT_OK(dest->Append(data));
+    bytes_read += data.size();
+  }
+  return Status::OK();
+}
+
+Status DeleteExcessFilesByPattern(Env* env, const string& pattern, int 
max_matches) {
+  // Negative numbers don't make sense for our interface.
+  DCHECK_GE(max_matches, 0);
+
+  vector<string> matching_files;
+  RETURN_NOT_OK(env->Glob(pattern, &matching_files));
+
+  if (matching_files.size() <= max_matches) {
+    return Status::OK();
+  }
+
+  vector<pair<time_t, string>> matching_file_mtimes;
+  for (string& matching_file_path : matching_files) {
+    int64_t mtime;
+    RETURN_NOT_OK(env->GetFileModifiedTime(matching_file_path, &mtime));
+    matching_file_mtimes.emplace_back(mtime, std::move(matching_file_path));
+  }
+
+  // Use mtime to determine which matching files to delete. This could
+  // potentially be ambiguous, depending on the resolution of last-modified
+  // timestamp in the filesystem, but that is part of the contract.
+  std::sort(matching_file_mtimes.begin(), matching_file_mtimes.end());
+  matching_file_mtimes.resize(matching_file_mtimes.size() - max_matches);
+
+  for (const auto& matching_file : matching_file_mtimes) {
+    RETURN_NOT_OK(env->DeleteFile(matching_file.second));
+  }
+
+  return Status::OK();
+}
+
+// Callback for DeleteTmpFilesRecursively().
+//
+// Tests 'basename' for the Kudu-specific tmp file infix, and if found,
+// deletes the file.
+static Status DeleteTmpFilesRecursivelyCb(Env* env,
+                                          Env::FileType file_type,
+                                          const string& dirname,
+                                          const string& basename) {
+  if (file_type != Env::FILE_TYPE) {
+    // Skip directories.
+    return Status::OK();
+  }
+
+  if (basename.find(kTmpInfix) != string::npos) {
+    string filename = JoinPathSegments(dirname, basename);
+    WARN_NOT_OK(env->DeleteFile(filename),
+                Substitute("Failed to remove temporary file $0", filename));
+  }
+  return Status::OK();
+}
+
+Status DeleteTmpFilesRecursively(Env* env, const string& path) {
+  return env->Walk(path, Env::PRE_ORDER, Bind(&DeleteTmpFilesRecursivelyCb, 
env));
+}
+
+ScopedFileDeleter::ScopedFileDeleter(Env* env, std::string path)
+    : env_(DCHECK_NOTNULL(env)), path_(std::move(path)), should_delete_(true) 
{}
+
+ScopedFileDeleter::~ScopedFileDeleter() {
+  if (should_delete_) {
+    bool is_dir;
+    Status s = env_->IsDirectory(path_, &is_dir);
+    WARN_NOT_OK(s, Substitute(
+        "Failed to determine if path is a directory: $0", path_));
+    if (!s.ok()) {
+      return;
+    }
+    if (is_dir) {
+      WARN_NOT_OK(env_->DeleteDir(path_),
+                  Substitute("Failed to remove directory: $0", path_));
+    } else {
+      WARN_NOT_OK(env_->DeleteFile(path_),
+          Substitute("Failed to remove file: $0", path_));
+    }
+  }
+}
+
+} // namespace env_util
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/env_util.h b/be/src/kudu/util/env_util.h
new file mode 100644
index 0000000..f93d81b
--- /dev/null
+++ b/be/src/kudu/util/env_util.h
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_ENV_UTIL_H
+#define KUDU_UTIL_ENV_UTIL_H
+
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/env.h"
+
+namespace kudu {
+namespace env_util {
+
+Status OpenFileForWrite(Env *env, const std::string &path,
+                        std::shared_ptr<WritableFile> *file);
+
+Status OpenFileForWrite(const WritableFileOptions& opts,
+                        Env *env, const std::string &path,
+                        std::shared_ptr<WritableFile> *file);
+
+Status OpenFileForRandom(Env *env, const std::string &path,
+                         std::shared_ptr<RandomAccessFile> *file);
+
+Status OpenFileForSequential(Env *env, const std::string &path,
+                             std::shared_ptr<SequentialFile> *file);
+
+// Returns Status::IOError with POSIX code ENOSPC if there is not sufficient
+// disk space to write 'bytes' bytes to the file system represented by 'path'.
+// Otherwise returns OK.
+// If 'reserved_bytes' equals -1, it is interpreted as a 1% reservation. No
+// other values less than 0 are supported at this time.
+Status VerifySufficientDiskSpace(Env *env, const std::string& path,
+                                 int64_t requested_bytes, int64_t 
reserved_bytes);
+
+// Creates the directory given by 'path', unless it already exists.
+//
+// If 'created' is not NULL, sets it to true if the directory was
+// created, false otherwise.
+Status CreateDirIfMissing(Env* env, const std::string& path,
+                          bool* created = NULL);
+
+// Recursively create directories, if they do not exist, along the given path.
+// Returns OK if successful or if the given path already existed.
+// Upon failure, it is possible that some part of the directory structure may
+// have been successfully created. Emulates the behavior of `mkdir -p`.
+Status CreateDirsRecursively(Env* env, const std::string& path);
+
+// Copy the contents of file source_path to file dest_path.
+// This is not atomic, and if there is an error while reading or writing,
+// a partial copy may be left in 'dest_path'. Does not fsync the parent
+// directory of dest_path -- if you need durability then do that yourself.
+Status CopyFile(Env* env, const std::string& source_path, const std::string& 
dest_path,
+                WritableFileOptions opts);
+
+// Deletes files matching 'pattern' in excess of 'max_matches' files.
+// 'max_matches' must be greater than or equal to 0.
+// The oldest files are deleted first, as determined by last modified time.
+// In the case that multiple files have the same last modified time, it is not
+// defined which file will be deleted first.
+Status DeleteExcessFilesByPattern(Env* env, const std::string& pattern, int 
max_matches);
+
+// Traverses 'path' recursively and deletes all files matching the special Kudu
+// tmp file infix. Does not follow symlinks.
+//
+// Deletion errors generate warnings but do not halt the traversal.
+Status DeleteTmpFilesRecursively(Env* env, const std::string& path);
+
+// Deletes a file or directory when this object goes out of scope.
+//
+// The deletion may be cancelled by calling .Cancel().
+// This is typically useful for cleaning up temporary files if the
+// creation of the tmp file may fail.
+class ScopedFileDeleter {
+ public:
+  ScopedFileDeleter(Env* env, std::string path);
+  ~ScopedFileDeleter();
+
+  // Do not delete the file when this object goes out of scope.
+  void Cancel() {
+    should_delete_ = false;
+  }
+
+ private:
+  Env* const env_;
+  const std::string path_;
+  bool should_delete_;
+
+  DISALLOW_COPY_AND_ASSIGN(ScopedFileDeleter);
+};
+
+} // namespace env_util
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/errno-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/errno-test.cc b/be/src/kudu/util/errno-test.cc
new file mode 100644
index 0000000..911ca14
--- /dev/null
+++ b/be/src/kudu/util/errno-test.cc
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/errno.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(OsUtilTest, TestErrnoToString) {
+  int err = ENOENT;
+
+  // Non-truncated result.
+  ASSERT_EQ("No such file or directory", ErrnoToString(err));
+
+  // Truncated because of a short buffer.
+  char buf[2];
+  ErrnoToCString(err, buf, arraysize(buf));
+  ASSERT_EQ("N", string(buf));
+
+  // Unknown error.
+  string expected = "Unknown error";
+  ASSERT_EQ(ErrnoToString(-1).compare(0, expected.length(), expected), 0);
+
+  // Unknown error (truncated).
+  ErrnoToCString(-1, buf, arraysize(buf));
+  ASSERT_EQ("U", string(buf));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/errno.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/errno.cc b/be/src/kudu/util/errno.cc
new file mode 100644
index 0000000..65b2173
--- /dev/null
+++ b/be/src/kudu/util/errno.cc
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/errno.h"
+
+#include <errno.h>
+#include <string.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/util/logging.h"
+
+namespace kudu {
+
+void ErrnoToCString(int err, char *buf, size_t buf_len) {
+  CHECK_GT(buf_len, 0);
+#if !defined(__GLIBC__) || \
+  ((_POSIX_C_SOURCE >= 200112 || _XOPEN_SOURCE >= 600) && 
!defined(_GNU_SOURCE))
+  // Using POSIX version 'int strerror_r(...)'.
+  int ret = strerror_r(err, buf, buf_len);
+  if (ret && ret != ERANGE && ret != EINVAL) {
+    strncpy(buf, "unknown error", buf_len);
+    buf[buf_len - 1] = '\0';
+  }
+#else
+  // Using GLIBC version
+
+  // KUDU-1515: TSAN in Clang 3.9 has an incorrect interceptor for strerror_r:
+  // https://github.com/google/sanitizers/issues/696
+  ANNOTATE_IGNORE_WRITES_BEGIN();
+  char* ret = strerror_r(err, buf, buf_len);
+  ANNOTATE_IGNORE_WRITES_END();
+  if (ret != buf) {
+    strncpy(buf, ret, buf_len);
+    buf[buf_len - 1] = '\0';
+  }
+#endif
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/errno.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/errno.h b/be/src/kudu/util/errno.h
new file mode 100644
index 0000000..7d5416e
--- /dev/null
+++ b/be/src/kudu/util/errno.h
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_ERRNO_H
+#define KUDU_ERRNO_H
+
+#include <string>
+
+namespace kudu {
+
+void ErrnoToCString(int err, char *buf, size_t buf_len);
+
+// Return a string representing an errno.
+inline static std::string ErrnoToString(int err) {
+  char buf[512];
+  ErrnoToCString(err, buf, sizeof(buf));
+  return std::string(buf);
+}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/failure_detector-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/failure_detector-test.cc 
b/be/src/kudu/util/failure_detector-test.cc
new file mode 100644
index 0000000..306d81a
--- /dev/null
+++ b/be/src/kudu/util/failure_detector-test.cc
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include <gtest/gtest.h>
+#include <string>
+
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/failure_detector.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+// How often we expect a node to heartbeat to assert its "aliveness".
+static const int kExpectedHeartbeatPeriodMillis = 100;
+
+// Number of heartbeats after which the FD will consider the node dead.
+static const int kMaxMissedHeartbeats = 2;
+
+// Let's check for failures every 100ms on average +/- 10ms.
+static const int kFailureMonitorMeanMillis = 100;
+static const int kFailureMonitorStddevMillis = 10;
+
+static const char* kNodeName = "node-1";
+static const char* kTestTabletName = "test-tablet";
+
+class FailureDetectorTest : public KuduTest {
+ public:
+  FailureDetectorTest()
+    : KuduTest(),
+      latch_(1),
+      monitor_(new RandomizedFailureMonitor(SeedRandom(),
+                                            kFailureMonitorMeanMillis,
+                                            kFailureMonitorStddevMillis)) {
+  }
+
+  void FailureFunction(const std::string& name, const Status& status) {
+    LOG(INFO) << "Detected failure of " << name;
+    latch_.CountDown();
+  }
+
+ protected:
+  void WaitForFailure() {
+    latch_.Wait();
+  }
+
+  CountDownLatch latch_;
+  gscoped_ptr<RandomizedFailureMonitor> monitor_;
+};
+
+// Tests that we can track a node, that while we notify that we're received 
messages from
+// that node everything is ok and that once we stop doing so the failure 
detection function
+// gets called.
+TEST_F(FailureDetectorTest, TestDetectsFailure) {
+  ASSERT_OK(monitor_->Start());
+
+  scoped_refptr<FailureDetector> detector(new TimedFailureDetector(
+      MonoDelta::FromMilliseconds(kExpectedHeartbeatPeriodMillis * 
kMaxMissedHeartbeats)));
+
+  monitor_->MonitorFailureDetector(kTestTabletName, detector);
+  ASSERT_FALSE(detector->IsTracking(kNodeName));
+  ASSERT_OK(detector->Track(kNodeName,
+                            MonoTime::Now(),
+                            Bind(&FailureDetectorTest::FailureFunction, 
Unretained(this))));
+  ASSERT_TRUE(detector->IsTracking(kNodeName));
+
+  const int kNumPeriodsToWait = 4;  // Num heartbeat periods to wait for a 
failure.
+  const int kUpdatesPerPeriod = 10; // Num updates we give per period to 
minimize test flakiness.
+
+  for (int i = 0; i < kNumPeriodsToWait * kUpdatesPerPeriod; i++) {
+    // Report in (heartbeat) to the detector.
+    ASSERT_OK(detector->MessageFrom(kNodeName, MonoTime::Now()));
+
+    // We sleep for a fraction of heartbeat period, to minimize test flakiness.
+    SleepFor(MonoDelta::FromMilliseconds(kExpectedHeartbeatPeriodMillis / 
kUpdatesPerPeriod));
+
+    // The latch shouldn't have counted down, since the node's been reporting 
that
+    // it's still alive.
+    ASSERT_EQ(1, latch_.count());
+  }
+
+  // If we stop reporting he node is alive the failure callback is eventually
+  // triggered and we exit.
+  WaitForFailure();
+
+  ASSERT_OK(detector->UnTrack(kNodeName));
+  ASSERT_FALSE(detector->IsTracking(kNodeName));
+
+  ASSERT_OK(monitor_->UnmonitorFailureDetector(kTestTabletName));
+  monitor_->Shutdown();
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/failure_detector.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/failure_detector.cc 
b/be/src/kudu/util/failure_detector.cc
new file mode 100644
index 0000000..510cbca
--- /dev/null
+++ b/be/src/kudu/util/failure_detector.cc
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/failure_detector.h"
+
+#include <glog/logging.h>
+#include <mutex>
+#include <unordered_map>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+
+using std::unordered_map;
+using strings::Substitute;
+
+const int64_t RandomizedFailureMonitor::kMinWakeUpTimeMillis = 10;
+
+TimedFailureDetector::TimedFailureDetector(MonoDelta failure_period)
+    : failure_period_(std::move(failure_period)) {}
+
+TimedFailureDetector::~TimedFailureDetector() {
+  STLDeleteValues(&nodes_);
+}
+
+Status TimedFailureDetector::Track(const string& name,
+                                   const MonoTime& now,
+                                   const FailureDetectedCallback& callback) {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  gscoped_ptr<Node> node(new Node);
+  node->permanent_name = name;
+  node->callback = callback;
+  node->last_heard_of = now;
+  node->status = ALIVE;
+  if (!InsertIfNotPresent(&nodes_, name, node.get())) {
+    return Status::AlreadyPresent(
+        Substitute("Node with name '$0' is already being monitored", name));
+  }
+  ignore_result(node.release());
+  return Status::OK();
+}
+
+Status TimedFailureDetector::UnTrack(const string& name) {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  Node* node = EraseKeyReturnValuePtr(&nodes_, name);
+  if (PREDICT_FALSE(node == NULL)) {
+    return Status::NotFound(Substitute("Node with name '$0' not found", name));
+  }
+  delete node;
+  return Status::OK();
+}
+
+bool TimedFailureDetector::IsTracking(const std::string& name) {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  return ContainsKey(nodes_, name);
+}
+
+Status TimedFailureDetector::MessageFrom(const std::string& name, const 
MonoTime& now) {
+  VLOG(3) << "Received message from " << name << " at " << now.ToString();
+  std::lock_guard<simple_spinlock> lock(lock_);
+  Node* node = FindPtrOrNull(nodes_, name);
+  if (node == NULL) {
+    VLOG(1) << "Not tracking node: " << name;
+    return Status::NotFound(Substitute("Message from unknown node '$0'", 
name));
+  }
+  node->last_heard_of = now;
+  node->status = ALIVE;
+  return Status::OK();
+}
+
+FailureDetector::NodeStatus TimedFailureDetector::GetNodeStatusUnlocked(const 
std::string& name,
+                                                                        const 
MonoTime& now) {
+  Node* node = FindOrDie(nodes_, name);
+  if ((now - node->last_heard_of) > failure_period_) {
+    node->status = DEAD;
+  }
+  return node->status;
+}
+
+void TimedFailureDetector::CheckForFailures(const MonoTime& now) {
+  typedef unordered_map<string, FailureDetectedCallback> CallbackMap;
+  CallbackMap callbacks;
+  {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    for (const NodeMap::value_type& entry : nodes_) {
+      if (GetNodeStatusUnlocked(entry.first, now) == DEAD) {
+        InsertOrDie(&callbacks, entry.first, entry.second->callback);
+      }
+    }
+  }
+  // Invoke failure callbacks outside of lock.
+  for (const CallbackMap::value_type& entry : callbacks) {
+    const string& node_name = entry.first;
+    const FailureDetectedCallback& callback = entry.second;
+    callback.Run(node_name, Status::RemoteError(Substitute("Node '$0' failed", 
node_name)));
+  }
+}
+
+RandomizedFailureMonitor::RandomizedFailureMonitor(uint32_t random_seed,
+                                                   int64_t period_mean_millis,
+                                                   int64_t 
period_stddev_millis)
+    : period_mean_millis_(period_mean_millis),
+      period_stddev_millis_(period_stddev_millis),
+      random_(random_seed),
+      run_latch_(0),
+      shutdown_(false) {
+}
+
+RandomizedFailureMonitor::~RandomizedFailureMonitor() {
+  Shutdown();
+}
+
+Status RandomizedFailureMonitor::Start() {
+  CHECK(!thread_);
+  run_latch_.Reset(1);
+  return Thread::Create("failure-monitors", "failure-monitor",
+                        &RandomizedFailureMonitor::RunThread,
+                        this, &thread_);
+}
+
+void RandomizedFailureMonitor::Shutdown() {
+  if (!thread_) {
+    return;
+  }
+
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    if (shutdown_) {
+      return;
+    }
+    shutdown_ = true;
+  }
+
+  run_latch_.CountDown();
+  CHECK_OK(ThreadJoiner(thread_.get()).Join());
+  thread_.reset();
+}
+
+Status RandomizedFailureMonitor::MonitorFailureDetector(const string& name,
+                                                        const 
scoped_refptr<FailureDetector>& fd) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  bool inserted = InsertIfNotPresent(&fds_, name, fd);
+  if (PREDICT_FALSE(!inserted)) {
+    return Status::AlreadyPresent(Substitute("Already monitoring failure 
detector '$0'", name));
+  }
+  return Status::OK();
+}
+
+Status RandomizedFailureMonitor::UnmonitorFailureDetector(const string& name) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  int count = fds_.erase(name);
+  if (PREDICT_FALSE(count == 0)) {
+    return Status::NotFound(Substitute("Failure detector '$0' not found", 
name));
+  }
+  return Status::OK();
+}
+
+void RandomizedFailureMonitor::RunThread() {
+  VLOG(1) << "Failure monitor thread starting";
+
+  while (true) {
+    int64_t wait_millis = random_.Normal(period_mean_millis_, 
period_stddev_millis_);
+    if (wait_millis < kMinWakeUpTimeMillis) {
+      wait_millis = kMinWakeUpTimeMillis;
+    }
+
+    MonoDelta wait_delta = MonoDelta::FromMilliseconds(wait_millis);
+    VLOG(3) << "RandomizedFailureMonitor sleeping for: " << 
wait_delta.ToString();
+    if (run_latch_.WaitFor(wait_delta)) {
+      // CountDownLatch reached 0.
+      std::lock_guard<simple_spinlock> lock(lock_);
+      // Check if we were told to shutdown.
+      if (shutdown_) {
+        // Latch fired: exit loop.
+        VLOG(1) << "RandomizedFailureMonitor thread shutting down";
+        return;
+      }
+    }
+
+    // Take a copy of the FD map under the lock.
+    FDMap fds_copy;
+    {
+      std::lock_guard<simple_spinlock> l(lock_);
+      fds_copy = fds_;
+    }
+
+    MonoTime now = MonoTime::Now();
+    for (const FDMap::value_type& entry : fds_copy) {
+      entry.second->CheckForFailures(now);
+    }
+  }
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/failure_detector.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/failure_detector.h 
b/be/src/kudu/util/failure_detector.h
new file mode 100644
index 0000000..0290011
--- /dev/null
+++ b/be/src/kudu/util/failure_detector.h
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_FAILURE_DETECTOR_H_
+#define KUDU_UTIL_FAILURE_DETECTOR_H_
+
+#include <string>
+#include <unordered_map>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status_callback.h"
+
+namespace kudu {
+class MonoDelta;
+class MonoTime;
+class Status;
+class Thread;
+
+// A generic interface for failure detector implementations.
+// A failure detector is responsible for deciding whether a certain server is 
dead or alive.
+class FailureDetector : public RefCountedThreadSafe<FailureDetector> {
+ public:
+  enum NodeStatus {
+    DEAD,
+    ALIVE
+  };
+  typedef std::unordered_map<std::string, NodeStatus> StatusMap;
+
+  typedef Callback<void(const std::string& name,
+                        const Status& status)> FailureDetectedCallback;
+
+  virtual ~FailureDetector() {}
+
+  // Registers a node with 'name' in the failure detector.
+  //
+  // If it returns Status::OK() the failure detector will from now
+  // expect messages from the machine with 'name' and will trigger
+  // 'callback' if a failure is detected.
+  //
+  // Returns Status::AlreadyPresent() if a machine with 'name' is
+  // already registered in this failure detector.
+  virtual Status Track(const std::string& name,
+                       const MonoTime& now,
+                       const FailureDetectedCallback& callback) = 0;
+
+  // Stops tracking node with 'name'.
+  virtual Status UnTrack(const std::string& name) = 0;
+
+  // Return true iff the named entity is currently being tracked.
+  virtual bool IsTracking(const std::string& name) = 0;
+
+  // Records that a message from machine with 'name' was received at 'now'.
+  virtual Status MessageFrom(const std::string& name, const MonoTime& now) = 0;
+
+  // Checks the failure status of each tracked node. If the failure criteria is
+  // met, the failure callback is invoked.
+  virtual void CheckForFailures(const MonoTime& now) = 0;
+};
+
+// A simple failure detector implementation that considers a node dead
+// when they have not reported by a certain time interval.
+class TimedFailureDetector : public FailureDetector {
+ public:
+  // Some monitorable entity.
+  struct Node {
+    std::string permanent_name;
+    MonoTime last_heard_of;
+    FailureDetectedCallback callback;
+    NodeStatus status;
+  };
+
+  explicit TimedFailureDetector(MonoDelta failure_period);
+  virtual ~TimedFailureDetector();
+
+  virtual Status Track(const std::string& name,
+                       const MonoTime& now,
+                       const FailureDetectedCallback& callback) OVERRIDE;
+
+  virtual Status UnTrack(const std::string& name) OVERRIDE;
+
+  virtual bool IsTracking(const std::string& name) OVERRIDE;
+
+  virtual Status MessageFrom(const std::string& name, const MonoTime& now) 
OVERRIDE;
+
+  virtual void CheckForFailures(const MonoTime& now) OVERRIDE;
+
+ private:
+  typedef std::unordered_map<std::string, Node*> NodeMap;
+
+  // Check if the named failure detector has failed.
+  // Does not invoke the callback.
+  FailureDetector::NodeStatus GetNodeStatusUnlocked(const std::string& name,
+                                                    const MonoTime& now);
+
+  const MonoDelta failure_period_;
+  mutable simple_spinlock lock_;
+  NodeMap nodes_;
+
+  DISALLOW_COPY_AND_ASSIGN(TimedFailureDetector);
+};
+
+// A randomized failure monitor that wakes up in normally-distributed intervals
+// and runs CheckForFailures() on each failure detector it monitors.
+//
+// The wake up interval is defined by a normal distribution with the specified
+// mean and standard deviation, in milliseconds, with minimum possible value
+// pinned at kMinWakeUpTimeMillis.
+//
+// We use a random wake up interval to avoid thundering herd / lockstep 
problems
+// when multiple nodes react to the failure of another node.
+class RandomizedFailureMonitor {
+ public:
+  // The minimum time the FailureMonitor will wait.
+  static const int64_t kMinWakeUpTimeMillis;
+
+  RandomizedFailureMonitor(uint32_t random_seed,
+                           int64_t period_mean_millis,
+                           int64_t period_std_dev_millis);
+  ~RandomizedFailureMonitor();
+
+  // Starts the failure monitor.
+  Status Start();
+
+  // Stops the failure monitor.
+  void Shutdown();
+
+  // Adds a failure detector to be monitored.
+  Status MonitorFailureDetector(const std::string& name,
+                                const scoped_refptr<FailureDetector>& fd);
+
+  // Unmonitors the failure detector with the specified name.
+  Status UnmonitorFailureDetector(const std::string& name);
+
+ private:
+  typedef std::unordered_map<std::string, scoped_refptr<FailureDetector> > 
FDMap;
+
+  // Runs the monitor thread.
+  void RunThread();
+
+  // Mean & std. deviation of random period to sleep for between checking the
+  // failure detectors.
+  const int64_t period_mean_millis_;
+  const int64_t period_stddev_millis_;
+  ThreadSafeRandom random_;
+
+  scoped_refptr<Thread> thread_;
+  CountDownLatch run_latch_;
+
+  mutable simple_spinlock lock_;
+  FDMap fds_;
+  bool shutdown_; // Whether the failure monitor should shut down.
+
+  DISALLOW_COPY_AND_ASSIGN(RandomizedFailureMonitor);
+};
+
+}  // namespace kudu
+
+#endif /* KUDU_UTIL_FAILURE_DETECTOR_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/faststring-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/faststring-test.cc 
b/be/src/kudu/util/faststring-test.cc
new file mode 100644
index 0000000..c57cb41
--- /dev/null
+++ b/be/src/kudu/util/faststring-test.cc
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include "kudu/util/faststring.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+class FaststringTest : public KuduTest {};
+
+TEST_F(FaststringTest, TestShrinkToFit_Empty) {
+  faststring s;
+  s.shrink_to_fit();
+  ASSERT_EQ(faststring::kInitialCapacity, s.capacity());
+}
+
+// Test that, if the string contents is shorter than the initial capacity
+// of the faststring, shrink_to_fit() leaves the string in the built-in
+// array.
+TEST_F(FaststringTest, TestShrinkToFit_SmallerThanInitialCapacity) {
+  faststring s;
+  s.append("hello");
+  s.shrink_to_fit();
+  ASSERT_EQ(faststring::kInitialCapacity, s.capacity());
+}
+
+TEST_F(FaststringTest, TestShrinkToFit_Random) {
+  Random r(GetRandomSeed32());
+  int kMaxSize = faststring::kInitialCapacity * 2;
+  std::unique_ptr<char[]> random_bytes(new char[kMaxSize]);
+  RandomString(random_bytes.get(), kMaxSize, &r);
+
+  faststring s;
+  for (int i = 0; i < 100; i++) {
+    int new_size = r.Uniform(kMaxSize);
+    s.resize(new_size);
+    memcpy(s.data(), random_bytes.get(), new_size);
+    s.shrink_to_fit();
+    ASSERT_EQ(0, memcmp(s.data(), random_bytes.get(), new_size));
+    ASSERT_EQ(std::max<int>(faststring::kInitialCapacity, new_size), 
s.capacity());
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/faststring.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/faststring.cc b/be/src/kudu/util/faststring.cc
new file mode 100644
index 0000000..a1cd26b
--- /dev/null
+++ b/be/src/kudu/util/faststring.cc
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/faststring.h"
+
+#include <glog/logging.h>
+#include <memory>
+
+namespace kudu {
+
+void faststring::GrowByAtLeast(size_t count) {
+  // Not enough space, need to reserve more.
+  // Don't reserve exactly enough space for the new string -- that makes it
+  // too easy to write perf bugs where you get O(n^2) append.
+  // Instead, alwayhs expand by at least 50%.
+
+  size_t to_reserve = len_ + count;
+  if (len_ + count < len_ * 3 / 2) {
+    to_reserve = len_ *  3 / 2;
+  }
+  GrowArray(to_reserve);
+}
+
+void faststring::GrowArray(size_t newcapacity) {
+  DCHECK_GE(newcapacity, capacity_);
+  std::unique_ptr<uint8_t[]> newdata(new uint8_t[newcapacity]);
+  if (len_ > 0) {
+    memcpy(&newdata[0], &data_[0], len_);
+  }
+  capacity_ = newcapacity;
+  if (data_ != initial_data_) {
+    delete[] data_;
+  } else {
+    ASAN_POISON_MEMORY_REGION(initial_data_, arraysize(initial_data_));
+  }
+
+  data_ = newdata.release();
+  ASAN_POISON_MEMORY_REGION(data_ + len_, capacity_ - len_);
+}
+
+void faststring::ShrinkToFitInternal() {
+  DCHECK_NE(data_, initial_data_);
+  if (len_ <= kInitialCapacity) {
+    ASAN_UNPOISON_MEMORY_REGION(initial_data_, len_);
+    memcpy(initial_data_, &data_[0], len_);
+    delete[] data_;
+    data_ = initial_data_;
+    capacity_ = kInitialCapacity;
+  } else {
+    std::unique_ptr<uint8_t[]> newdata(new uint8_t[len_]);
+    memcpy(&newdata[0], &data_[0], len_);
+    delete[] data_;
+    data_ = newdata.release();
+    capacity_ = len_;
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/faststring.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/faststring.h b/be/src/kudu/util/faststring.h
new file mode 100644
index 0000000..3d25c84
--- /dev/null
+++ b/be/src/kudu/util/faststring.h
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_FASTSTRING_H
+#define KUDU_UTIL_FASTSTRING_H
+
+#include <string>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/fastmem.h"
+
+namespace kudu {
+
+// A faststring is similar to a std::string, except that it is faster for many
+// common use cases (in particular, resize() will fill with uninitialized data
+// instead of memsetting to \0)
+class faststring {
+ public:
+  enum {
+    kInitialCapacity = 32
+  };
+
+  faststring() :
+    data_(initial_data_),
+    len_(0),
+    capacity_(kInitialCapacity) {
+  }
+
+  // Construct a string with the given capacity, in bytes.
+  explicit faststring(size_t capacity)
+    : data_(initial_data_),
+      len_(0),
+      capacity_(kInitialCapacity) {
+    if (capacity > capacity_) {
+      data_ = new uint8_t[capacity];
+      capacity_ = capacity;
+    }
+    ASAN_POISON_MEMORY_REGION(data_, capacity_);
+  }
+
+  ~faststring() {
+    ASAN_UNPOISON_MEMORY_REGION(initial_data_, arraysize(initial_data_));
+    if (data_ != initial_data_) {
+      delete[] data_;
+    }
+  }
+
+  // Reset the valid length of the string to 0.
+  //
+  // This does not free up any memory. The capacity of the string remains 
unchanged.
+  void clear() {
+    resize(0);
+    ASAN_POISON_MEMORY_REGION(data_, capacity_);
+  }
+
+  // Resize the string to the given length.
+  // If the new length is larger than the old length, the capacity is expanded 
as necessary.
+  //
+  // NOTE: in contrast to std::string's implementation, Any newly "exposed" 
bytes of data are
+  // not cleared.
+  void resize(size_t newsize) {
+    if (newsize > capacity_) {
+      reserve(newsize);
+    }
+    len_ = newsize;
+    ASAN_POISON_MEMORY_REGION(data_ + len_, capacity_ - len_);
+    ASAN_UNPOISON_MEMORY_REGION(data_, len_);
+  }
+
+  // Releases the underlying array; after this, the buffer is left empty.
+  //
+  // NOTE: the data pointer returned by release() is not necessarily the 
pointer
+  uint8_t *release() WARN_UNUSED_RESULT {
+    uint8_t *ret = data_;
+    if (ret == initial_data_) {
+      ret = new uint8_t[len_];
+      memcpy(ret, data_, len_);
+    }
+    len_ = 0;
+    capacity_ = kInitialCapacity;
+    data_ = initial_data_;
+    ASAN_POISON_MEMORY_REGION(data_, capacity_);
+    return ret;
+  }
+
+  // Reserve space for the given total amount of data. If the current capacity 
is already
+  // larger than the newly requested capacity, this is a no-op (i.e. it does 
not ever free memory).
+  //
+  // NOTE: even though the new capacity is reserved, it is illegal to begin 
writing into that memory
+  // directly using pointers. If ASAN is enabled, this is ensured using manual 
memory poisoning.
+  void reserve(size_t newcapacity) {
+    if (PREDICT_TRUE(newcapacity <= capacity_)) return;
+    GrowArray(newcapacity);
+  }
+
+  // Append the given data to the string, resizing capacity as necessary.
+  void append(const void *src_v, size_t count) {
+    const uint8_t *src = reinterpret_cast<const uint8_t *>(src_v);
+    EnsureRoomForAppend(count);
+    ASAN_UNPOISON_MEMORY_REGION(data_ + len_, count);
+
+    // appending short values is common enough that this
+    // actually helps, according to benchmarks. In theory
+    // memcpy_inlined should already be just as good, but this
+    // was ~20% faster for reading a large prefix-coded string file
+    // where each string was only a few chars different
+    if (count <= 4) {
+      uint8_t *p = &data_[len_];
+      for (int i = 0; i < count; i++) {
+        *p++ = *src++;
+      }
+    } else {
+      strings::memcpy_inlined(&data_[len_], src, count);
+    }
+    len_ += count;
+  }
+
+  // Append the given string to this string.
+  void append(const std::string &str) {
+    append(str.data(), str.size());
+  }
+
+  // Append the given character to this string.
+  void push_back(const char byte) {
+    EnsureRoomForAppend(1);
+    ASAN_UNPOISON_MEMORY_REGION(data_ + len_, 1);
+    data_[len_] = byte;
+    len_++;
+  }
+
+  // Return the valid length of this string.
+  size_t length() const {
+    return len_;
+  }
+
+  // Return the valid length of this string (identical to length())
+  size_t size() const {
+    return len_;
+  }
+
+  // Return the allocated capacity of this string.
+  size_t capacity() const {
+    return capacity_;
+  }
+
+  // Return a pointer to the data in this string. Note that this pointer
+  // may be invalidated by any later non-const operation.
+  const uint8_t *data() const {
+    return &data_[0];
+  }
+
+  // Return a pointer to the data in this string. Note that this pointer
+  // may be invalidated by any later non-const operation.
+  uint8_t *data() {
+    return &data_[0];
+  }
+
+  // Return the given element of this string. Note that this does not perform
+  // any bounds checking.
+  const uint8_t &at(size_t i) const {
+    return data_[i];
+  }
+
+  // Return the given element of this string. Note that this does not perform
+  // any bounds checking.
+  const uint8_t &operator[](size_t i) const {
+    return data_[i];
+  }
+
+  // Return the given element of this string. Note that this does not perform
+  // any bounds checking.
+  uint8_t &operator[](size_t i) {
+    return data_[i];
+  }
+
+  // Reset the contents of this string by copying 'len' bytes from 'src'.
+  void assign_copy(const uint8_t *src, size_t len) {
+    // Reset length so that the first resize doesn't need to copy the current
+    // contents of the array.
+    len_ = 0;
+    resize(len);
+    memcpy(data(), src, len);
+  }
+
+  // Reset the contents of this string by copying from the given std::string.
+  void assign_copy(const std::string &str) {
+    assign_copy(reinterpret_cast<const uint8_t *>(str.c_str()),
+                str.size());
+  }
+
+  // Reallocates the internal storage to fit only the current data.
+  //
+  // This may revert to using internal storage if the current length is 
shorter than
+  // kInitialCapacity. Note that, in that case, after this call, capacity() 
will return
+  // a capacity larger than the data length.
+  //
+  // Any pointers within this instance are invalidated.
+  void shrink_to_fit() {
+    if (data_ == initial_data_ || capacity_ == len_) return;
+    ShrinkToFitInternal();
+  }
+
+  // Return a copy of this string as a std::string.
+  std::string ToString() const {
+    return std::string(reinterpret_cast<const char *>(data()),
+                       len_);
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(faststring);
+
+  // If necessary, expand the buffer to fit at least 'count' more bytes.
+  // If the array has to be grown, it is grown by at least 50%.
+  void EnsureRoomForAppend(size_t count) {
+    if (PREDICT_TRUE(len_ + count <= capacity_)) {
+      return;
+    }
+
+    // Call the non-inline slow path - this reduces the number of instructions
+    // on the hot path.
+    GrowByAtLeast(count);
+  }
+
+  // The slow path of MakeRoomFor. Grows the buffer by either
+  // 'count' bytes, or 50%, whichever is more.
+  void GrowByAtLeast(size_t count);
+
+  // Grow the array to the given capacity, which must be more than
+  // the current capacity.
+  void GrowArray(size_t newcapacity);
+
+  void ShrinkToFitInternal();
+
+  uint8_t* data_;
+  uint8_t initial_data_[kInitialCapacity];
+  size_t len_;
+  size_t capacity_;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/fault_injection.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/fault_injection.cc 
b/be/src/kudu/util/fault_injection.cc
new file mode 100644
index 0000000..a14c8f3
--- /dev/null
+++ b/be/src/kudu/util/fault_injection.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/fault_injection.h"
+
+#include <stdlib.h>
+#include <sys/time.h>
+
+#include "kudu/gutil/once.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/os-util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+
+namespace kudu {
+namespace fault_injection {
+
+namespace {
+GoogleOnceType g_random_once;
+Random* g_random;
+
+void InitRandom() {
+  LOG(WARNING) << "FAULT INJECTION ENABLED!";
+  LOG(WARNING) << "THIS SERVER MAY CRASH!";
+
+  debug::ScopedLeakCheckDisabler d;
+  g_random = new Random(GetRandomSeed32());
+  ANNOTATE_BENIGN_RACE_SIZED(g_random, sizeof(Random),
+                             "Racy random numbers are OK");
+}
+
+} // anonymous namespace
+
+void DoMaybeFault(const char* fault_str, double fraction) {
+  GoogleOnceInit(&g_random_once, InitRandom);
+  if (PREDICT_TRUE(g_random->NextDoubleFraction() >= fraction)) {
+    return;
+  }
+  LOG(ERROR) << "Injecting fault: " << fault_str << " (process will exit)";
+  // _exit will exit the program without running atexit handlers. This more
+  // accurately simiulates a crash.
+  _exit(kExitStatus);
+}
+
+void DoInjectRandomLatency(double max_latency_ms) {
+  GoogleOnceInit(&g_random_once, InitRandom);
+  SleepFor(MonoDelta::FromMilliseconds(g_random->NextDoubleFraction() * 
max_latency_ms));
+}
+
+Status DoMaybeReturnFailure(double fraction,
+                            const Status& bad_status_to_return) {
+  GoogleOnceInit(&g_random_once, InitRandom);
+  if (PREDICT_TRUE(g_random->NextDoubleFraction() >= fraction)) {
+    return Status::OK();
+  }
+  return bad_status_to_return;
+}
+
+} // namespace fault_injection
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/fault_injection.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/fault_injection.h 
b/be/src/kudu/util/fault_injection.h
new file mode 100644
index 0000000..cc22bab
--- /dev/null
+++ b/be/src/kudu/util/fault_injection.h
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_FAULT_INJECTION_H
+#define KUDU_UTIL_FAULT_INJECTION_H
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+// Macros for injecting various kinds of faults with varying probability. If
+// configured with 0 probability, each of these macros is evaluated inline and
+// is fast enough to run even in hot code paths.
+
+// With some probability, crash at the current point in the code
+// by issuing LOG(FATAL).
+//
+// The probability is determined by the 'fraction_flag' argument.
+//
+// Typical usage:
+//
+//   DEFINE_double(fault_crash_before_foo, 0.0,
+//                 "Fraction of the time when we will crash before doing foo");
+//   TAG_FLAG(fault_crash_before_foo, unsafe);
+#define MAYBE_FAULT(fraction_flag) \
+  kudu::fault_injection::MaybeFault(AS_STRING(fraction_flag), fraction_flag)
+
+// Inject a uniformly random amount of latency between 0 and the configured
+// number of milliseconds.
+#define MAYBE_INJECT_RANDOM_LATENCY(max_ms_flag) \
+  kudu::fault_injection::MaybeInjectRandomLatency(max_ms_flag)
+
+// With some probability, return the failure described by 'status_expr'.
+//
+// Unlike the other MAYBE_ macros, this one does not chain to an inline
+// function so that 'status_expr' isn't evaluated unless 'fraction_flag'
+// really is non-zero.
+#define MAYBE_RETURN_FAILURE(fraction_flag, status_expr) \
+  static const Status status_eval = (status_expr); \
+  RETURN_NOT_OK(kudu::fault_injection::MaybeReturnFailure(fraction_flag, 
status_eval));
+
+// Implementation details below.
+// Use the MAYBE_FAULT macro instead.
+namespace kudu {
+namespace fault_injection {
+
+// The exit status returned from a process exiting due to a fault.
+// The choice of value here is arbitrary: just needs to be something
+// wouldn't normally be returned by a non-fault-injection code path.
+constexpr int kExitStatus = 85;
+
+// Out-of-line implementation.
+void DoMaybeFault(const char* fault_str, double fraction);
+void DoInjectRandomLatency(double max_latency_ms);
+Status DoMaybeReturnFailure(double fraction,
+                            const Status& bad_status_to_return);
+
+inline void MaybeFault(const char* fault_str, double fraction) {
+  if (PREDICT_TRUE(fraction <= 0)) return;
+  DoMaybeFault(fault_str, fraction);
+}
+
+inline void MaybeInjectRandomLatency(double max_latency) {
+  if (PREDICT_TRUE(max_latency <= 0)) return;
+  DoInjectRandomLatency(max_latency);
+}
+
+inline Status MaybeReturnFailure(double fraction,
+                                 const Status& bad_status_to_return) {
+  if (PREDICT_TRUE(fraction <= 0)) return Status::OK();
+  return DoMaybeReturnFailure(fraction, bad_status_to_return);
+}
+
+} // namespace fault_injection
+} // namespace kudu
+#endif /* KUDU_UTIL_FAULT_INJECTION_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/file_cache-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache-stress-test.cc 
b/be/src/kudu/util/file_cache-stress-test.cc
new file mode 100644
index 0000000..966f202
--- /dev/null
+++ b/be/src/kudu/util/file_cache-stress-test.cc
@@ -0,0 +1,390 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <deque>
+#include <iterator>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/file_cache-test-util.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/oid_generator.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+// Like CHECK_OK(), but dumps the contents of the cache before failing.
+//
+// The output of ToDebugString() tends to be long enough that LOG() truncates
+// it, so we must split it ourselves before logging.
+#define TEST_CHECK_OK(to_call) do {                                       \
+    const Status& _s = (to_call);                                         \
+    if (!_s.ok()) {                                                       \
+      LOG(INFO) << "Dumping cache contents";                              \
+      vector<string> lines = strings::Split(cache_.ToDebugString(), "\n", \
+                                            strings::SkipEmpty());        \
+      for (const auto& l : lines) {                                       \
+        LOG(INFO) << l;                                                   \
+      }                                                                   \
+    }                                                                     \
+    CHECK(_s.ok()) << "Bad status: " << _s.ToString();                    \
+  } while (0);
+
+// This default value is friendly to many n-CPU configurations.
+DEFINE_int32(test_max_open_files, 192, "Maximum number of open files enforced "
+             "by the cache. Should be a multiple of the number of CPUs on the "
+             "system.");
+
+DEFINE_int32(test_num_producer_threads, 1, "Number of producer threads");
+DEFINE_int32(test_num_consumer_threads, 4, "Number of consumer threads");
+DEFINE_int32(test_duration_secs, 2, "Number of seconds to run the test");
+
+using std::deque;
+using std::shared_ptr;
+using std::thread;
+using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+template <class FileType>
+class FileCacheStressTest : public KuduTest {
+ public:
+  typedef unordered_map<string, unordered_map<string, int>> MetricMap;
+
+  FileCacheStressTest()
+      : cache_("test",
+               env_,
+               FLAGS_test_max_open_files,
+               scoped_refptr<MetricEntity>()),
+        rand_(SeedRandom()),
+        running_(1) {
+  }
+
+  void SetUp() override {
+    ASSERT_OK(cache_.Init());
+  }
+
+  void ProducerThread() {
+    Random rand(rand_.Next32());
+    ObjectIdGenerator oid_generator;
+    MetricMap metrics;
+
+    do {
+      // Create a new file with some (0-32k) random data in it.
+      string next_file_name = GetTestPath(oid_generator.Next());
+      {
+        unique_ptr<WritableFile> next_file;
+        CHECK_OK(env_->NewWritableFile(next_file_name, &next_file));
+        uint8_t buf[rand.Uniform((32 * 1024) - 1) + 1];
+        CHECK_OK(next_file->Append(GenerateRandomChunk(buf, sizeof(buf), 
&rand)));
+        CHECK_OK(next_file->Close());
+      }
+      {
+        std::lock_guard<simple_spinlock> l(lock_);
+        InsertOrDie(&available_files_, next_file_name, 0);
+      }
+      metrics[BaseName(next_file_name)]["create"] = 1;
+    } while (!running_.WaitFor(MonoDelta::FromMilliseconds(1)));
+
+    // Update the global metrics map.
+    MergeNewMetrics(std::move(metrics));
+  }
+
+  void ConsumerThread() {
+    // Each thread has its own PRNG to minimize contention on the main one.
+    Random rand(rand_.Next32());
+
+    // Active opened files in this thread.
+    deque<shared_ptr<FileType>> files;
+
+    // Metrics generated by this thread. They will be merged into the main
+    // metrics map when the thread is done.
+    MetricMap metrics;
+
+    do {
+      // Pick an action to perform. Distribution:
+      // 20% open
+      // 15% close
+      // 35% read
+      // 20% write
+      // 10% delete
+      int next_action = rand.Uniform(100);
+
+      if (next_action < 20) {
+        // Open an existing file.
+        string to_open;
+        if (!GetRandomFile(OPEN, &rand, &to_open)) {
+          continue;
+        }
+        shared_ptr<FileType> new_file;
+        TEST_CHECK_OK(cache_.OpenExistingFile(to_open, &new_file));
+        FinishedOpen(to_open);
+        metrics[BaseName(to_open)]["open"]++;
+        files.emplace_back(new_file);
+      } else if (next_action < 35) {
+        // Close a file.
+        if (files.empty()) {
+          continue;
+        }
+        shared_ptr<FileType> file = files.front();
+        files.pop_front();
+        metrics[BaseName(file->filename())]["close"]++;
+      } else if (next_action < 70) {
+        // Read a random chunk from a file.
+        TEST_CHECK_OK(ReadRandomChunk(files, &metrics, &rand));
+      } else if (next_action < 90) {
+        // Write a random chunk to a file.
+        TEST_CHECK_OK(WriteRandomChunk(files, &metrics, &rand));
+      } else if (next_action < 100) {
+        // Delete a file.
+        string to_delete;
+        if (!GetRandomFile(DELETE, &rand, &to_delete)) {
+          continue;
+        }
+        TEST_CHECK_OK(cache_.DeleteFile(to_delete));
+        metrics[BaseName(to_delete)]["delete"]++;
+      }
+    } while (!running_.WaitFor(MonoDelta::FromMilliseconds(1)));
+
+    // Update the global metrics map.
+    MergeNewMetrics(std::move(metrics));
+  }
+
+ protected:
+  void NotifyThreads() { running_.CountDown(); }
+
+  const MetricMap& metrics() const { return metrics_; }
+
+ private:
+  enum GetMode {
+    OPEN,
+    DELETE
+  };
+
+  // Retrieve a random file name to be either opened or deleted. If deleting,
+  // the file name is made inaccessible to future operations.
+  bool GetRandomFile(GetMode mode, Random* rand, string* out) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    if (available_files_.empty()) {
+      return false;
+    }
+
+    // This is linear time, but it's simpler than managing multiple data
+    // structures.
+    auto it = available_files_.begin();
+    std::advance(it, rand->Uniform(available_files_.size()));
+
+    // It's unsafe to delete a file that is still being opened.
+    if (mode == DELETE && it->second > 0) {
+      return false;
+    }
+
+    *out = it->first;
+    if (mode == OPEN) {
+      it->second++;
+    } else {
+      available_files_.erase(it);
+    }
+    return true;
+  }
+
+  // Signal that a previously in-progress open has finished, allowing the file
+  // in question to be deleted.
+  void FinishedOpen(const string& opened) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    int& openers = FindOrDie(available_files_, opened);
+    openers--;
+  }
+
+  // Reads a random chunk of data from a random file in 'files'. On success,
+  // writes to 'metrics'.
+  static Status ReadRandomChunk(const deque<shared_ptr<FileType>>& files,
+                                MetricMap* metrics,
+                                Random* rand) {
+    if (files.empty()) {
+      return Status::OK();
+    }
+    const shared_ptr<FileType>& file = files[rand->Uniform(files.size())];
+
+    uint64_t file_size;
+    RETURN_NOT_OK(file->Size(&file_size));
+    uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0;
+    size_t len = file_size > 0 ? rand->Uniform(file_size - off) : 0;
+    unique_ptr<uint8_t[]> scratch(new uint8_t[len]);
+    Slice s(scratch.get(), len);
+    RETURN_NOT_OK(file->Read(off, &s));
+
+    (*metrics)[BaseName(file->filename())]["read"]++;
+    return Status::OK();
+  }
+
+  // Writes a random chunk of data to a random file in 'files'. On success,
+  // updates 'metrics'.
+  //
+  // No-op for file implementations that don't support writing.
+  static Status WriteRandomChunk(const deque<shared_ptr<FileType>>& files,
+                                 MetricMap* metrics,
+                                 Random* rand);
+
+  static Slice GenerateRandomChunk(uint8_t* buffer, size_t max_length, Random* 
rand) {
+    size_t len = rand->Uniform(max_length);
+    len -= len % sizeof(uint32_t);
+    for (int i = 0; i < (len / sizeof(uint32_t)); i += sizeof(uint32_t)) {
+      reinterpret_cast<uint32_t*>(buffer)[i] = rand->Next32();
+    }
+    return Slice(buffer, len);
+  }
+
+  // Merge the metrics in 'new_metrics' into the global metric map.
+  void MergeNewMetrics(MetricMap new_metrics) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (const auto& file_action_pair : new_metrics) {
+      for (const auto& action_count_pair : file_action_pair.second) {
+        metrics_[file_action_pair.first][action_count_pair.first] += 
action_count_pair.second;
+      }
+    }
+  }
+
+  FileCache<FileType> cache_;
+
+  // Used to seed per-thread PRNGs.
+  ThreadSafeRandom rand_;
+
+  // Drops to zero when the test ends.
+  CountDownLatch running_;
+
+  // Protects 'available_files_' and 'metrics_'.
+  simple_spinlock lock_;
+
+  // Contains files produced by producer threads and ready for consumption by
+  // consumer threads.
+  //
+  // Each entry is a file name and the number of in-progress openers. To delete
+  // a file, there must be no openers.
+  unordered_map<string, int> available_files_;
+
+  // For each file name, tracks the count of consumer actions performed.
+  //
+  // Only updated at test end.
+  MetricMap metrics_;
+};
+
+template <>
+Status FileCacheStressTest<RWFile>::WriteRandomChunk(
+    const deque<shared_ptr<RWFile>>& files,
+    MetricMap* metrics,
+    Random* rand) {
+  if (files.empty()) {
+    return Status::OK();
+  }
+  const shared_ptr<RWFile>& file = files[rand->Uniform(files.size())];
+
+  uint64_t file_size;
+  RETURN_NOT_OK(file->Size(&file_size));
+  uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0;
+  uint8 buf[64];
+  RETURN_NOT_OK(file->Write(off, GenerateRandomChunk(buf, sizeof(buf), rand)));
+  (*metrics)[BaseName(file->filename())]["write"]++;
+  return Status::OK();
+}
+
+template <>
+Status FileCacheStressTest<RandomAccessFile>::WriteRandomChunk(
+    const deque<shared_ptr<RandomAccessFile>>& /* unused */,
+    MetricMap* /* unused */,
+    Random* /* unused */) {
+  return Status::OK();
+}
+
+typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes;
+TYPED_TEST_CASE(FileCacheStressTest, FileTypes);
+
+TYPED_TEST(FileCacheStressTest, TestStress) {
+  OverrideFlagForSlowTests("test_num_producer_threads", "2");
+  OverrideFlagForSlowTests("test_num_consumer_threads", "8");
+  OverrideFlagForSlowTests("test_duration_secs", "30");
+
+  // Start the threads.
+  PeriodicOpenFdChecker checker(
+      this->env_,
+      FLAGS_test_max_open_files +       // cache capacity
+      FLAGS_test_num_producer_threads + // files being written
+      FLAGS_test_num_consumer_threads); // files being opened
+  checker.Start();
+  vector<thread> producers;
+  for (int i = 0; i < FLAGS_test_num_producer_threads; i++) {
+    producers.emplace_back(&FileCacheStressTest<TypeParam>::ProducerThread, 
this);
+  }
+  vector<thread> consumers;
+  for (int i = 0; i < FLAGS_test_num_consumer_threads; i++) {
+    consumers.emplace_back(&FileCacheStressTest<TypeParam>::ConsumerThread, 
this);
+  }
+
+  // Let the test run.
+  SleepFor(MonoDelta::FromSeconds(FLAGS_test_duration_secs));
+
+  // Stop the threads.
+  this->NotifyThreads();
+  checker.Stop();
+  for (auto& p : producers) {
+    p.join();
+  }
+  for (auto& c : consumers) {
+    c.join();
+  }
+
+  // Log the metrics.
+  unordered_map<string, int> action_counts;
+  for (const auto& file_action_pair : this->metrics()) {
+    for (const auto& action_count_pair : file_action_pair.second) {
+      VLOG(2) << Substitute("$0: $1: $2",
+                            file_action_pair.first,
+                            action_count_pair.first,
+                            action_count_pair.second);
+      action_counts[action_count_pair.first] += action_count_pair.second;
+    }
+  }
+  for (const auto& action_count_pair : action_counts) {
+    LOG(INFO) << Substitute("$0: $1",
+                            action_count_pair.first,
+                            action_count_pair.second);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/file_cache-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache-test-util.h 
b/be/src/kudu/util/file_cache-test-util.h
new file mode 100644
index 0000000..8b0bcce
--- /dev/null
+++ b/be/src/kudu/util/file_cache-test-util.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <thread>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+// Periodically checks the number of open file descriptors belonging to this
+// process, crashing if it exceeds some upper bound.
+class PeriodicOpenFdChecker {
+ public:
+  PeriodicOpenFdChecker(Env* env, int upper_bound)
+    : env_(env),
+      initial_fd_count_(CountOpenFds(env)),
+      max_fd_count_(upper_bound + initial_fd_count_),
+      running_(1),
+      started_(false) {}
+
+  ~PeriodicOpenFdChecker() { Stop(); }
+
+  void Start() {
+    DCHECK(!started_);
+    running_.Reset(1);
+    check_thread_ = std::thread(&PeriodicOpenFdChecker::CheckThread, this);
+    started_ = true;
+  }
+
+  void Stop() {
+    if (started_) {
+      running_.CountDown();
+      check_thread_.join();
+      started_ = false;
+    }
+  }
+
+ private:
+  void CheckThread() {
+    LOG(INFO) << strings::Substitute("Periodic open fd checker starting "
+        "(initial: $0 max: $1)",
+        initial_fd_count_, max_fd_count_);
+    do {
+      int open_fd_count = CountOpenFds(env_);
+      KLOG_EVERY_N_SECS(INFO, 1) << strings::Substitute("Open fd count: $0/$1",
+                                                        open_fd_count,
+                                                        max_fd_count_);
+      CHECK_LE(open_fd_count, max_fd_count_);
+    } while (!running_.WaitFor(MonoDelta::FromMilliseconds(100)));
+  }
+
+  Env* env_;
+  const int initial_fd_count_;
+  const int max_fd_count_;
+
+  CountDownLatch running_;
+  std::thread check_thread_;
+  bool started_;
+};
+
+} // namespace kudu

Reply via email to