http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/file_cache-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache-test.cc 
b/be/src/kudu/util/file_cache-test.cc
new file mode 100644
index 0000000..5ac568e
--- /dev/null
+++ b/be/src/kudu/util/file_cache-test.cc
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(cache_force_single_shard);
+DECLARE_int32(file_cache_expiry_period_ms);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+template <class FileType>
+class FileCacheTest : public KuduTest {
+ public:
+  FileCacheTest()
+      : rand_(SeedRandom()),
+        initial_open_fds_(CountOpenFds(env_)) {
+    // Simplify testing of the actual cache capacity.
+    FLAGS_cache_force_single_shard = true;
+
+    // Speed up tests that check the number of descriptors.
+    FLAGS_file_cache_expiry_period_ms = 1;
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(ReinitCache(1));
+  }
+
+ protected:
+  Status ReinitCache(int max_open_files) {
+    cache_.reset(new FileCache<FileType>("test",
+                                         env_,
+                                         max_open_files,
+                                         nullptr));
+    return cache_->Init();
+  }
+
+  Status WriteTestFile(const string& name, const string& data) {
+    unique_ptr<RWFile> f;
+    RETURN_NOT_OK(env_->NewRWFile(name, &f));
+    RETURN_NOT_OK(f->Write(0, data));
+    return Status::OK();
+  }
+
+  void AssertFdsAndDescriptors(int num_expected_fds,
+                               int num_expected_descriptors) {
+    ASSERT_EQ(initial_open_fds_ + num_expected_fds, CountOpenFds(env_));
+
+    // The expiry thread may take some time to run.
+    ASSERT_EVENTUALLY([&]() {
+      ASSERT_EQ(num_expected_descriptors, cache_->NumDescriptorsForTests());
+    });
+  }
+
+  Random rand_;
+  const int initial_open_fds_;
+  unique_ptr<FileCache<FileType>> cache_;
+};
+
+typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes;
+TYPED_TEST_CASE(FileCacheTest, FileTypes);
+
+TYPED_TEST(FileCacheTest, TestBasicOperations) {
+  // Open a non-existent file.
+  {
+    shared_ptr<TypeParam> f;
+    ASSERT_TRUE(this->cache_->OpenExistingFile(
+        "/does/not/exist", &f).IsNotFound());
+    NO_FATALS(this->AssertFdsAndDescriptors(0, 0));
+  }
+
+  const string kFile1 = this->GetTestPath("foo");
+  const string kFile2 = this->GetTestPath("bar");
+  const string kData1 = "test data 1";
+  const string kData2 = "test data 2";
+
+  // Create some test files.
+  ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+  ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+  NO_FATALS(this->AssertFdsAndDescriptors(0, 0));
+
+  {
+    // Open a test file. It should open an fd and create a descriptor.
+    shared_ptr<TypeParam> f1;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f1));
+    NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+
+    // Spot check the test data by comparing sizes.
+    for (int i = 0; i < 3; i++) {
+      uint64_t size;
+      ASSERT_OK(f1->Size(&size));
+      ASSERT_EQ(kData1.size(), size);
+      NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+    }
+
+    // Open the same file a second time. It should reuse the existing
+    // descriptor and not open a second fd.
+    shared_ptr<TypeParam> f2;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+    NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+    {
+      Cache::UniqueHandle uh(
+          this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE),
+          Cache::HandleDeleter(this->cache_->cache_.get()));
+      ASSERT_TRUE(uh.get());
+    }
+
+    // Open a second file. This will create a new descriptor, but evict the fd
+    // opened for the first file, so the fd count should remain constant.
+    shared_ptr<TypeParam> f3;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f3));
+    NO_FATALS(this->AssertFdsAndDescriptors(1, 2));
+    {
+      Cache::UniqueHandle uh(
+          this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE),
+          Cache::HandleDeleter(this->cache_->cache_.get()));
+      ASSERT_FALSE(uh.get());
+    }
+    {
+      Cache::UniqueHandle uh(
+          this->cache_->cache_->Lookup(kFile2, Cache::EXPECT_IN_CACHE),
+          Cache::HandleDeleter(this->cache_->cache_.get()));
+      ASSERT_TRUE(uh.get());
+    }
+  }
+
+  // The descriptors are all out of scope, but the open fds remain in the 
cache.
+  NO_FATALS(this->AssertFdsAndDescriptors(1, 0));
+
+  // With the cache gone, so are the cached fds.
+  this->cache_.reset();
+  ASSERT_EQ(this->initial_open_fds_, CountOpenFds(this->env_));
+}
+
+TYPED_TEST(FileCacheTest, TestDeletion) {
+  // Deleting a file that doesn't exist does nothing/
+  ASSERT_TRUE(this->cache_->DeleteFile("/does/not/exist").IsNotFound());
+
+  // Create a test file, then delete it. It will be deleted immediately.
+  const string kFile1 = this->GetTestPath("foo");
+  const string kData1 = "test data 1";
+  ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+  ASSERT_TRUE(this->env_->FileExists(kFile1));
+  ASSERT_OK(this->cache_->DeleteFile(kFile1));
+  ASSERT_FALSE(this->env_->FileExists(kFile1));
+
+  // Trying to delete it again fails.
+  ASSERT_TRUE(this->cache_->DeleteFile(kFile1).IsNotFound());
+
+  // Create another test file, open it, then delete it. The delete is not
+  // effected until the last open descriptor is closed. In between, the
+  // cache won't allow the file to be opened again.
+  const string kFile2 = this->GetTestPath("bar");
+  const string kData2 = "test data 2";
+  ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+  ASSERT_TRUE(this->env_->FileExists(kFile2));
+  {
+    shared_ptr<TypeParam> f1;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f1));
+    ASSERT_EQ(this->initial_open_fds_ + 1, CountOpenFds(this->env_));
+    ASSERT_OK(this->cache_->DeleteFile(kFile2));
+    {
+      shared_ptr<TypeParam> f2;
+      ASSERT_TRUE(this->cache_->OpenExistingFile(kFile2, &f2).IsNotFound());
+    }
+    ASSERT_TRUE(this->cache_->DeleteFile(kFile2).IsNotFound());
+    ASSERT_TRUE(this->env_->FileExists(kFile2));
+    ASSERT_EQ(this->initial_open_fds_ + 1, CountOpenFds(this->env_));
+  }
+  ASSERT_FALSE(this->env_->FileExists(kFile2));
+  ASSERT_EQ(this->initial_open_fds_, CountOpenFds(this->env_));
+
+  // Create a test file, open it, and let it go out of scope before
+  // deleting it. The deletion should evict the fd and close it, despite
+  // happening after the descriptor is gone.
+  const string kFile3 = this->GetTestPath("baz");
+  const string kData3 = "test data 3";
+  ASSERT_OK(this->WriteTestFile(kFile3, kData3));
+  {
+    shared_ptr<TypeParam> f3;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile3, &f3));
+  }
+  ASSERT_TRUE(this->env_->FileExists(kFile3));
+  ASSERT_EQ(this->initial_open_fds_ + 1, CountOpenFds(this->env_));
+  ASSERT_OK(this->cache_->DeleteFile(kFile3));
+  ASSERT_FALSE(this->env_->FileExists(kFile3));
+  ASSERT_EQ(this->initial_open_fds_, CountOpenFds(this->env_));
+}
+
+TYPED_TEST(FileCacheTest, TestHeavyReads) {
+  const int kNumFiles = 20;
+  const int kNumIterations = 100;
+  const int kCacheCapacity = 5;
+
+  ASSERT_OK(this->ReinitCache(kCacheCapacity));
+
+  // Randomly generate some data.
+  string data;
+  for (int i = 0; i < 1000; i++) {
+    data += Substitute("$0", this->rand_.Next());
+  }
+
+  // Write that data to a bunch of files and open them through the cache.
+  vector<shared_ptr<TypeParam>> opened_files;
+  for (int i = 0; i < kNumFiles; i++) {
+    string filename = this->GetTestPath(Substitute("$0", i));
+    ASSERT_OK(this->WriteTestFile(filename, data));
+    shared_ptr<TypeParam> f;
+    ASSERT_OK(this->cache_->OpenExistingFile(filename, &f));
+    opened_files.push_back(f);
+  }
+
+  // Read back the data at random through the cache.
+  unique_ptr<uint8_t[]> buf(new uint8_t[data.length()]);
+  for (int i = 0; i < kNumIterations; i++) {
+    int idx = this->rand_.Uniform(opened_files.size());
+    const auto& f = opened_files[idx];
+    uint64_t size;
+    ASSERT_OK(f->Size(&size));
+    Slice s(buf.get(), size);
+    ASSERT_OK(f->Read(0, &s));
+    ASSERT_EQ(data, s);
+    ASSERT_LE(CountOpenFds(this->env_),
+              this->initial_open_fds_ + kCacheCapacity);
+  }
+}
+
+TYPED_TEST(FileCacheTest, TestNoRecursiveDeadlock) {
+  // This test triggered a deadlock in a previous implementation, when expired
+  // weak_ptrs were removed from the descriptor map in the descriptor's
+  // destructor.
+  alarm(60);
+  auto cleanup = MakeScopedCleanup([]() {
+    alarm(0);
+  });
+
+  const string kFile = this->GetTestPath("foo");
+  ASSERT_OK(this->WriteTestFile(kFile, "test data"));
+
+  vector<std::thread> threads;
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&]() {
+      for (int i = 0; i < 10000; i++) {
+        shared_ptr<TypeParam> f;
+        CHECK_OK(this->cache_->OpenExistingFile(kFile, &f));
+      }
+    });
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+class RandomAccessFileCacheTest : public FileCacheTest<RandomAccessFile> {
+};
+
+TEST_F(RandomAccessFileCacheTest, TestMemoryFootprintDoesNotCrash) {
+  const string kFile = this->GetTestPath("foo");
+  ASSERT_OK(this->WriteTestFile(kFile, "test data"));
+
+  shared_ptr<RandomAccessFile> f;
+  ASSERT_OK(this->cache_->OpenExistingFile(kFile, &f));
+
+  // This used to crash due to a kudu_malloc_usable_size() call on a memory
+  // address that wasn't the start of an actual heap allocation.
+  LOG(INFO) << f->memory_footprint();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/file_cache.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache.cc b/be/src/kudu/util/file_cache.cc
new file mode 100644
index 0000000..b8d17cf
--- /dev/null
+++ b/be/src/kudu/util/file_cache.cc
@@ -0,0 +1,629 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/once.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+DEFINE_int32(file_cache_expiry_period_ms, 60 * 1000,
+             "Period of time (in ms) between removing expired file cache 
descriptors");
+TAG_FLAG(file_cache_expiry_period_ms, advanced);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+template <class FileType>
+FileType* CacheValueToFileType(Slice s) {
+  return reinterpret_cast<FileType*>(*reinterpret_cast<void**>(
+      s.mutable_data()));
+}
+
+template <class FileType>
+class EvictionCallback : public Cache::EvictionCallback {
+ public:
+  EvictionCallback() {}
+
+  void EvictedEntry(Slice key, Slice value) override {
+    VLOG(2) << "Evicted fd belonging to " << key.ToString();
+    delete CacheValueToFileType<FileType>(value);
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(EvictionCallback);
+};
+
+} // anonymous namespace
+
+namespace internal {
+
+template <class FileType>
+class ScopedOpenedDescriptor;
+
+// Encapsulates common descriptor fields and methods.
+template <class FileType>
+class BaseDescriptor {
+ public:
+  BaseDescriptor(FileCache<FileType>* file_cache,
+                 const string& filename)
+      : file_cache_(file_cache),
+        file_name_(filename) {}
+
+  ~BaseDescriptor() {
+    VLOG(2) << "Out of scope descriptor with file name: " << filename();
+
+    // The (now expired) weak_ptr remains in 'descriptors_', to be removed by
+    // the next call to RunDescriptorExpiry(). Removing it here would risk a
+    // deadlock on recursive acquisition of 'lock_'.
+
+    if (deleted_) {
+      cache()->Erase(filename());
+
+      VLOG(1) << "Deleting file: " << filename();
+      WARN_NOT_OK(env()->DeleteFile(filename()), "");
+    }
+  }
+
+  // Insert a pointer to an open file object into the file cache with the
+  // filename as the cache key.
+  //
+  // Returns a handle to the inserted entry. The handle always contains an open
+  // file.
+  ScopedOpenedDescriptor<FileType> InsertIntoCache(void* file_ptr) const {
+    // The allocated charge is always one byte. This is incorrect with respect
+    // to memory tracking, but it's necessary if the cache capacity is to be
+    // equivalent to the max number of fds.
+    Cache::PendingHandle* pending = CHECK_NOTNULL(cache()->Allocate(
+        filename(), sizeof(file_ptr), 1));
+    memcpy(cache()->MutableValue(pending),
+           &file_ptr,
+           sizeof(file_ptr));
+    return ScopedOpenedDescriptor<FileType>(this, Cache::UniqueHandle(
+        cache()->Insert(pending, file_cache_->eviction_cb_.get()),
+        Cache::HandleDeleter(cache())));
+  }
+
+  // Retrieves a pointer to an open file object from the file cache with the
+  // filename as the cache key.
+  //
+  // Returns a handle to the looked up entry. The handle may or may not contain
+  // an open file, depending on whether the cache hit or missed.
+  ScopedOpenedDescriptor<FileType> LookupFromCache() const {
+    return ScopedOpenedDescriptor<FileType>(this, Cache::UniqueHandle(
+        cache()->Lookup(filename(), Cache::EXPECT_IN_CACHE),
+        Cache::HandleDeleter(cache())));
+  }
+
+  // Mark this descriptor as to-be-deleted later.
+  void MarkDeleted() {
+    DCHECK(!deleted_);
+    deleted_ = true;
+  }
+
+  Cache* cache() const { return file_cache_->cache_.get(); }
+
+  Env* env() const { return file_cache_->env_; }
+
+  const string& filename() const { return file_name_; }
+
+  bool deleted() const { return deleted_; }
+
+ private:
+  FileCache<FileType>* file_cache_;
+  const string file_name_;
+
+  bool deleted_ = false;
+
+  DISALLOW_COPY_AND_ASSIGN(BaseDescriptor);
+};
+
+// A "smart" retrieved LRU cache handle.
+//
+// The cache handle is released when this object goes out of scope, possibly
+// closing the opened file if it is no longer in the cache.
+template <class FileType>
+class ScopedOpenedDescriptor {
+ public:
+  // A not-yet-but-soon-to-be opened descriptor.
+  explicit ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc)
+      : desc_(desc),
+        handle_(nullptr, Cache::HandleDeleter(desc_->cache())) {
+  }
+
+  // An opened descriptor. Its handle may or may not contain an open file.
+  ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc,
+                         Cache::UniqueHandle handle)
+      : desc_(desc),
+        handle_(std::move(handle)) {
+  }
+
+  bool opened() const { return handle_.get(); }
+
+  FileType* file() const {
+    DCHECK(opened());
+    return 
CacheValueToFileType<FileType>(desc_->cache()->Value(handle_.get()));
+  }
+
+ private:
+  const BaseDescriptor<FileType>* desc_;
+  Cache::UniqueHandle handle_;
+};
+
+// Reference to an on-disk file that may or may not be opened (and thus
+// cached) in the file cache.
+//
+// This empty template is just a specification; actual descriptor classes must
+// be fully specialized.
+template <class FileType>
+class Descriptor : public FileType {
+};
+
+// A descriptor adhering to the RWFile interface (i.e. when opened, provides
+// a read-write interface to the underlying file).
+template <>
+class Descriptor<RWFile> : public RWFile {
+ public:
+  Descriptor(FileCache<RWFile>* file_cache, const string& filename)
+      : base_(file_cache, filename) {}
+
+  ~Descriptor() = default;
+
+  Status Read(uint64_t offset, Slice* result) const override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Read(offset, result);
+  }
+
+  Status ReadV(uint64_t offset, vector<Slice>* results) const override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->ReadV(offset, results);
+  }
+
+  Status Write(uint64_t offset, const Slice& data) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Write(offset, data);
+  }
+
+  Status WriteV(uint64_t offset, const vector<Slice> &data) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->WriteV(offset, data);
+  }
+
+  Status PreAllocate(uint64_t offset, size_t length, PreAllocateMode mode) 
override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->PreAllocate(offset, length, mode);
+  }
+
+  Status Truncate(uint64_t length) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Truncate(length);
+  }
+
+  Status PunchHole(uint64_t offset, size_t length) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->PunchHole(offset, length);
+  }
+
+  Status Flush(FlushMode mode, uint64_t offset, size_t length) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Flush(mode, offset, length);
+  }
+
+  Status Sync() override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Sync();
+  }
+
+  Status Close() override {
+    // Intentional no-op; actual closing is deferred to LRU cache eviction.
+    return Status::OK();
+  }
+
+  Status Size(uint64_t* size) const override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Size(size);
+  }
+
+  Status GetExtentMap(ExtentMap* out) const override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->GetExtentMap(out);
+  }
+
+  const string& filename() const override {
+    return base_.filename();
+  }
+
+ private:
+  friend class FileCache<RWFile>;
+
+  Status Init() {
+    return once_.Init(&Descriptor<RWFile>::InitOnce, this);
+  }
+
+  Status InitOnce() {
+    return ReopenFileIfNecessary(nullptr);
+  }
+
+  Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const {
+    ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache());
+    if (found.opened()) {
+      // The file is already open in the cache, return it.
+      if (out) {
+        *out = std::move(found);
+      }
+      return Status::OK();
+    }
+
+    // The file was evicted, reopen it.
+    //
+    // Because the file may be evicted at any time we must use 'sync_on_close'
+    // (note: sync is a no-op if the file isn't dirty).
+    RWFileOptions opts;
+    opts.sync_on_close = true;
+    opts.mode = Env::OPEN_EXISTING;
+    unique_ptr<RWFile> f;
+    RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));
+
+    // The cache will take ownership of the newly opened file.
+    ScopedOpenedDescriptor<RWFile> opened(base_.InsertIntoCache(f.release()));
+    if (out) {
+      *out = std::move(opened);
+    }
+    return Status::OK();
+  }
+
+  BaseDescriptor<RWFile> base_;
+  KuduOnceDynamic once_;
+
+  DISALLOW_COPY_AND_ASSIGN(Descriptor);
+};
+
+// A descriptor adhering to the RandomAccessFile interface (i.e. when opened,
+// provides a read-only interface to the underlying file).
+template <>
+class Descriptor<RandomAccessFile> : public RandomAccessFile {
+ public:
+  Descriptor(FileCache<RandomAccessFile>* file_cache, const string& filename)
+      : base_(file_cache, filename) {}
+
+  ~Descriptor() = default;
+
+  Status Read(uint64_t offset, Slice* result) const override {
+    ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Read(offset, result);
+  }
+
+  Status ReadV(uint64_t offset, vector<Slice>* results) const override {
+    ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->ReadV(offset, results);
+  }
+
+  Status Size(uint64_t *size) const override {
+    ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Size(size);
+  }
+
+  const string& filename() const override {
+    return base_.filename();
+  }
+
+  size_t memory_footprint() const override {
+    // Normally we would use kudu_malloc_usable_size(this). However, that's
+    // not safe because 'this' was allocated via std::make_shared(), which
+    // means it isn't necessarily the base of the memory allocation; it may be
+    // preceded by the shared_ptr control block.
+    //
+    // It doesn't appear possible to get the base of the allocation via any
+    // shared_ptr APIs, so we'll use sizeof(*this) + 16 instead. The 16 bytes
+    // represent the shared_ptr control block. Overall the object size is still
+    // undercounted as it doesn't account for any internal heap fragmentation,
+    // but at least it's safe.
+    //
+    // Some anecdotal memory measurements taken inside gdb:
+    // - glibc 2.23 malloc_usable_size() on make_shared<FileType>: 88 bytes.
+    // - tcmalloc malloc_usable_size() on make_shared<FileType>: 96 bytes.
+    // - sizeof(std::_Sp_counted_base<>) with libstdc++ 5.4: 16 bytes.
+    // - sizeof(std::__1::__shared_ptr_emplace<>) with libc++ 3.9: 16 bytes.
+    // - sizeof(*this): 72 bytes.
+    return sizeof(*this) +
+        16 + // shared_ptr control block
+        once_.memory_footprint_excluding_this() +
+        base_.filename().capacity();
+  }
+
+ private:
+  friend class FileCache<RandomAccessFile>;
+
+  Status Init() {
+    return once_.Init(&Descriptor<RandomAccessFile>::InitOnce, this);
+  }
+
+  Status InitOnce() {
+    return ReopenFileIfNecessary(nullptr);
+  }
+
+  Status ReopenFileIfNecessary(
+      ScopedOpenedDescriptor<RandomAccessFile>* out) const {
+    ScopedOpenedDescriptor<RandomAccessFile> found(base_.LookupFromCache());
+    if (found.opened()) {
+      // The file is already open in the cache, return it.
+      if (out) {
+        *out = std::move(found);
+      }
+      return Status::OK();
+    }
+
+    // The file was evicted, reopen it.
+    unique_ptr<RandomAccessFile> f;
+    RETURN_NOT_OK(base_.env()->NewRandomAccessFile(base_.filename(), &f));
+
+    // The cache will take ownership of the newly opened file.
+    ScopedOpenedDescriptor<RandomAccessFile> opened(
+        base_.InsertIntoCache(f.release()));
+    if (out) {
+      *out = std::move(opened);
+    }
+    return Status::OK();
+  }
+
+  BaseDescriptor<RandomAccessFile> base_;
+  KuduOnceDynamic once_;
+
+  DISALLOW_COPY_AND_ASSIGN(Descriptor);
+};
+
+} // namespace internal
+
+template <class FileType>
+FileCache<FileType>::FileCache(const string& cache_name,
+                               Env* env,
+                               int max_open_files,
+                               const scoped_refptr<MetricEntity>& entity)
+    : env_(env),
+      cache_name_(cache_name),
+      eviction_cb_(new EvictionCallback<FileType>()),
+      cache_(NewLRUCache(DRAM_CACHE, max_open_files, cache_name)),
+      running_(1) {
+  if (entity) {
+    cache_->SetMetrics(entity);
+  }
+  LOG(INFO) << Substitute("Constructed file cache $0 with capacity $1",
+                          cache_name, max_open_files);
+}
+
+template <class FileType>
+FileCache<FileType>::~FileCache() {
+  running_.CountDown();
+  if (descriptor_expiry_thread_) {
+    descriptor_expiry_thread_->Join();
+  }
+}
+
+template <class FileType>
+Status FileCache<FileType>::Init() {
+  return Thread::Create("cache", Substitute("$0-evict", cache_name_),
+                        &FileCache::RunDescriptorExpiry, this,
+                        &descriptor_expiry_thread_);
+}
+
+template <class FileType>
+Status FileCache<FileType>::OpenExistingFile(const string& file_name,
+                                             shared_ptr<FileType>* file) {
+  shared_ptr<internal::Descriptor<FileType>> desc;
+  {
+    // Find an existing descriptor, or create one if none exists.
+    std::lock_guard<simple_spinlock> l(lock_);
+    RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
+    if (desc) {
+      VLOG(2) << "Found existing descriptor: " << desc->filename();
+    } else {
+      desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
+      InsertOrDie(&descriptors_, file_name, desc);
+      VLOG(2) << "Created new descriptor: " << desc->filename();
+    }
+  }
+
+  // Check that the underlying file can be opened (no-op for found
+  // descriptors). Done outside the lock.
+  RETURN_NOT_OK(desc->Init());
+  *file = desc;
+  return Status::OK();
+}
+
+template <class FileType>
+Status FileCache<FileType>::DeleteFile(const string& file_name) {
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    shared_ptr<internal::Descriptor<FileType>> desc;
+    RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
+
+    if (desc) {
+      VLOG(2) << "Marking file for deletion: " << file_name;
+      desc->base_.MarkDeleted();
+      return Status::OK();
+    }
+  }
+
+  // There is no outstanding descriptor. Delete the file now.
+  //
+  // Make sure it's been fully evicted from the cache (perhaps it was opened
+  // previously?) so that the filesystem can reclaim the file data instantly.
+  cache_->Erase(file_name);
+  return env_->DeleteFile(file_name);
+}
+
+template <class FileType>
+int FileCache<FileType>::NumDescriptorsForTests() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return descriptors_.size();
+}
+
+template <class FileType>
+string FileCache<FileType>::ToDebugString() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  string ret;
+  for (const auto& e : descriptors_) {
+    bool strong = false;
+    bool deleted = false;
+    bool opened = false;
+    shared_ptr<internal::Descriptor<FileType>> desc = e.second.lock();
+    if (desc) {
+      strong = true;
+      if (desc->base_.deleted()) {
+        deleted = true;
+      }
+      internal::ScopedOpenedDescriptor<FileType> o(
+          desc->base_.LookupFromCache());
+      if (o.opened()) {
+        opened = true;
+      }
+    }
+    if (strong) {
+      ret += Substitute("$0 (S$1$2)\n", e.first,
+                        deleted ? "D" : "", opened ? "O" : "");
+    } else {
+      ret += Substitute("$0\n", e.first);
+    }
+  }
+  return ret;
+}
+
+template <class FileType>
+Status FileCache<FileType>::FindDescriptorUnlocked(
+    const string& file_name,
+    shared_ptr<internal::Descriptor<FileType>>* file) {
+  DCHECK(lock_.is_locked());
+
+  auto it = descriptors_.find(file_name);
+  if (it != descriptors_.end()) {
+    // Found the descriptor. Has it expired?
+    shared_ptr<internal::Descriptor<FileType>> desc = it->second.lock();
+    if (desc) {
+      if (desc->base_.deleted()) {
+        return Status::NotFound("File already marked for deletion", file_name);
+      }
+
+      // Descriptor is still valid, return it.
+      if (file) {
+        *file = desc;
+      }
+      return Status::OK();
+    }
+    // Descriptor has expired; erase it and pretend we found nothing.
+    descriptors_.erase(it);
+  }
+  return Status::OK();
+}
+
+template <class FileType>
+void FileCache<FileType>::RunDescriptorExpiry() {
+  while (!running_.WaitFor(MonoDelta::FromMilliseconds(
+      FLAGS_file_cache_expiry_period_ms))) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (auto it = descriptors_.begin(); it != descriptors_.end();) {
+      if (it->second.expired()) {
+        it = descriptors_.erase(it);
+      } else {
+        it++;
+      }
+    }
+  }
+}
+
+// Explicit specialization for callers outside this compilation unit.
+template
+FileCache<RWFile>::FileCache(
+    const string& cache_name,
+    Env* env,
+    int max_open_files,
+    const scoped_refptr<MetricEntity>& entity);
+template
+FileCache<RWFile>::~FileCache();
+template
+Status FileCache<RWFile>::Init();
+template
+Status FileCache<RWFile>::OpenExistingFile(
+    const string& file_name,
+    shared_ptr<RWFile>* file);
+template
+Status FileCache<RWFile>::DeleteFile(const string& file_name);
+template
+int FileCache<RWFile>::NumDescriptorsForTests() const;
+template
+string FileCache<RWFile>::ToDebugString() const;
+
+template
+FileCache<RandomAccessFile>::FileCache(
+    const string& cache_name,
+    Env* env,
+    int max_open_files,
+    const scoped_refptr<MetricEntity>& entity);
+template
+FileCache<RandomAccessFile>::~FileCache();
+template
+Status FileCache<RandomAccessFile>::Init();
+template
+Status FileCache<RandomAccessFile>::OpenExistingFile(
+    const string& file_name,
+    shared_ptr<RandomAccessFile>* file);
+template
+Status FileCache<RandomAccessFile>::DeleteFile(const string& file_name);
+template
+int FileCache<RandomAccessFile>::NumDescriptorsForTests() const;
+template
+string FileCache<RandomAccessFile>::ToDebugString() const;
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/file_cache.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache.h b/be/src/kudu/util/file_cache.h
new file mode 100644
index 0000000..3ef87ad
--- /dev/null
+++ b/be/src/kudu/util/file_cache.h
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace internal {
+
+template <class FileType>
+class BaseDescriptor;
+
+template <class FileType>
+class Descriptor;
+
+} // namespace internal
+
+class MetricEntity;
+class Thread;
+
+// Cache of open files.
+//
+// The purpose of this cache is to enforce an upper bound on the maximum number
+// of files open at a time. Files opened through the cache may be closed at any
+// time, only to be reopened upon next use.
+//
+// The file cache can be viewed as having two logical parts: the client-facing
+// API and the LRU cache.
+//
+// Client-facing API
+// -----------------
+// The core of the client-facing API is the cache descriptor. A descriptor
+// uniquely identifies an opened file. To a client, a descriptor is just an
+// open file interface of the variety defined in util/env.h. Clients open
+// descriptors via the OpenExistingFile() cache method.
+//
+// Descriptors are shared objects; an existing descriptor is handed back to a
+// client if a file with the same name is already opened. To facilitate
+// descriptor sharing, the file cache maintains a by-file-name descriptor map.
+// The values are weak references to the descriptors so that map entries don't
+// affect the descriptor lifecycle.
+//
+// LRU cache
+// ---------
+// The lower half of the file cache is a standard LRU cache whose keys are file
+// names and whose values are pointers to opened file objects allocated on the
+// heap. Unlike the descriptor map, this cache has an upper bound on capacity,
+// and handles are evicted (and closed) according to an LRU algorithm.
+//
+// Whenever a descriptor is used by a client in file I/O, its file name is used
+// in an LRU cache lookup. If found, the underlying file is still open and the
+// file access is performed. Otherwise, the file must have been evicted and
+// closed, so it is reopened and reinserted (possibly evicting a different open
+// file) before the file access is performed.
+//
+// Other notes
+// -----------
+// In a world where files are opened and closed transparently, file deletion
+// demands special care if UNIX semantics are to be preserved. When a call to
+// DeleteFile() is made to a file with an opened descriptor, the descriptor is
+// simply "marked" as to-be-deleted-later. Only when all references to the
+// descriptor are dropped is the file actually deleted. If there is no open
+// descriptor, the file is deleted immediately.
+//
+// Every public method in the file cache is thread safe.
+template <class FileType>
+class FileCache {
+ public:
+  // Creates a new file cache.
+  //
+  // The 'cache_name' is used to disambiguate amongst other file cache
+  // instances. The cache will use 'max_open_files' as a soft upper bound on
+  // the number of files open at any given time.
+  FileCache(const std::string& cache_name,
+            Env* env,
+            int max_open_files,
+            const scoped_refptr<MetricEntity>& entity);
+
+  // Destroys the file cache.
+  ~FileCache();
+
+  // Initializes the file cache. Initialization done here may fail.
+  Status Init();
+
+  // Opens an existing file by name through the cache.
+  //
+  // The returned 'file' is actually an object called a descriptor. It adheres
+  // to a file-like interface but interfaces with the cache under the hood to
+  // reopen a file as needed during file operations.
+  //
+  // The descriptor is opened immediately to verify that the on-disk file can
+  // be opened, but may be closed later if the cache reaches its upper bound on
+  // the number of open files.
+  Status OpenExistingFile(const std::string& file_name,
+                          std::shared_ptr<FileType>* file);
+
+  // Deletes a file by name through the cache.
+  //
+  // If there is an outstanding descriptor for the file, the deletion will be
+  // deferred until the last referent is dropped. Otherwise, the file is
+  // deleted immediately.
+  Status DeleteFile(const std::string& file_name);
+
+  // Returns the number of entries in the descriptor map.
+  //
+  // Only intended for unit tests.
+  int NumDescriptorsForTests() const;
+
+  // Dumps the contents of the file cache. Intended for debugging.
+  std::string ToDebugString() const;
+
+ private:
+  friend class internal::BaseDescriptor<FileType>;
+
+  template<class FileType2>
+  FRIEND_TEST(FileCacheTest, TestBasicOperations);
+
+  // Looks up a descriptor by file name.
+  //
+  // Must be called with 'lock_' held.
+  Status FindDescriptorUnlocked(
+      const std::string& file_name,
+      std::shared_ptr<internal::Descriptor<FileType>>* file);
+
+  // Periodically removes expired descriptors from 'descriptors_'.
+  void RunDescriptorExpiry();
+
+  // Interface to the underlying filesystem.
+  Env* env_;
+
+  // Name of the cache.
+  const std::string cache_name_;
+
+  // Invoked whenever a cached file reaches zero references (i.e. it was
+  // removed from the cache and is no longer in use by any file operations).
+  std::unique_ptr<Cache::EvictionCallback> eviction_cb_;
+
+  // Underlying cache instance. Caches opened files.
+  std::unique_ptr<Cache> cache_;
+
+  // Protects the descriptor map.
+  mutable simple_spinlock lock_;
+
+  // Maps filenames to descriptors.
+  std::unordered_map<std::string,
+                     std::weak_ptr<internal::Descriptor<FileType>>> 
descriptors_;
+
+  // Calls RunDescriptorExpiry() in a loop until 'running_' isn't set.
+  scoped_refptr<Thread> descriptor_expiry_thread_;
+
+  // Tracks whether or not 'descriptor_expiry_thread_' should be running.
+  CountDownLatch running_;
+
+  DISALLOW_COPY_AND_ASSIGN(FileCache);
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/flag_tags-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/flag_tags-test.cc 
b/be/src/kudu/util/flag_tags-test.cc
new file mode 100644
index 0000000..13ee535
--- /dev/null
+++ b/be/src/kudu/util/flag_tags-test.cc
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include <unordered_set>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/logging_test_util.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(never_fsync);
+
+DEFINE_int32(flag_with_no_tags, 0, "test flag that has no tags");
+
+DEFINE_int32(flag_with_one_tag, 0, "test flag that has 1 tag");
+TAG_FLAG(flag_with_one_tag, stable);
+
+DEFINE_int32(flag_with_two_tags, 0, "test flag that has 2 tags");
+TAG_FLAG(flag_with_two_tags, evolving);
+TAG_FLAG(flag_with_two_tags, unsafe);
+
+DEFINE_bool(test_unsafe_flag, false, "an unsafe flag");
+TAG_FLAG(test_unsafe_flag, unsafe);
+
+DEFINE_bool(test_experimental_flag, false, "an experimental flag");
+TAG_FLAG(test_experimental_flag, experimental);
+
+DEFINE_bool(test_sensitive_flag, false, "a sensitive flag");
+TAG_FLAG(test_sensitive_flag, sensitive);
+
+using std::string;
+using std::unordered_set;
+
+namespace kudu {
+
+class FlagTagsTest : public KuduTest {};
+
+TEST_F(FlagTagsTest, TestTags) {
+  unordered_set<string> tags;
+  GetFlagTags("flag_with_no_tags", &tags);
+  EXPECT_EQ(0, tags.size());
+
+  GetFlagTags("flag_with_one_tag", &tags);
+  EXPECT_EQ(1, tags.size());
+  EXPECT_TRUE(ContainsKey(tags, "stable"));
+
+  GetFlagTags("flag_with_two_tags", &tags);
+  EXPECT_EQ(2, tags.size());
+  EXPECT_TRUE(ContainsKey(tags, "evolving"));
+  EXPECT_TRUE(ContainsKey(tags, "unsafe"));
+
+  GetFlagTags("missing_flag", &tags);
+  EXPECT_EQ(0, tags.size());
+}
+
+TEST_F(FlagTagsTest, TestUnlockFlags) {
+  // Setting an unsafe flag without unlocking should crash.
+  {
+    gflags::FlagSaver s;
+    gflags::SetCommandLineOption("test_unsafe_flag", "true");
+    ASSERT_DEATH({ HandleCommonFlags(); },
+                 "Flag --test_unsafe_flag is unsafe and unsupported.*"
+                 "Use --unlock_unsafe_flags to proceed");
+  }
+
+  // Setting an unsafe flag with unlocking should proceed with a warning.
+  {
+    StringVectorSink sink;
+    ScopedRegisterSink reg(&sink);
+    gflags::FlagSaver s;
+    gflags::SetCommandLineOption("test_unsafe_flag", "true");
+    gflags::SetCommandLineOption("unlock_unsafe_flags", "true");
+    HandleCommonFlags();
+    ASSERT_EQ(1, sink.logged_msgs().size());
+    ASSERT_STR_CONTAINS(sink.logged_msgs()[0], "Enabled unsafe flag: 
--test_unsafe_flag");
+  }
+
+  // Setting an experimental flag without unlocking should crash.
+  {
+    gflags::FlagSaver s;
+    gflags::SetCommandLineOption("test_experimental_flag", "true");
+    ASSERT_DEATH({ HandleCommonFlags(); },
+                 "Flag --test_experimental_flag is experimental and 
unsupported.*"
+                 "Use --unlock_experimental_flags to proceed");
+  }
+
+  // Setting an experimental flag with unlocking should proceed with a warning.
+  {
+    StringVectorSink sink;
+    ScopedRegisterSink reg(&sink);
+    gflags::FlagSaver s;
+    gflags::SetCommandLineOption("test_experimental_flag", "true");
+    gflags::SetCommandLineOption("unlock_experimental_flags", "true");
+    HandleCommonFlags();
+    ASSERT_EQ(1, sink.logged_msgs().size());
+    ASSERT_STR_CONTAINS(sink.logged_msgs()[0],
+                        "Enabled experimental flag: --test_experimental_flag");
+  }
+}
+
+TEST_F(FlagTagsTest, TestSensitiveFlags) {
+  // Setting a sensitive flag should return a redacted value.
+  {
+    ASSERT_STR_CONTAINS(CommandlineFlagsIntoString(EscapeMode::NONE), 
strings::Substitute(
+                        "--test_sensitive_flag=$0", kRedactionMessage));
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/flag_tags.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/flag_tags.cc b/be/src/kudu/util/flag_tags.cc
new file mode 100644
index 0000000..8b9ebe3
--- /dev/null
+++ b/be/src/kudu/util/flag_tags.cc
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/flag_tags.h"
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/singleton.h"
+
+#include <map>
+#include <string>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+using std::multimap;
+using std::pair;
+using std::string;
+using std::unordered_set;
+using std::vector;
+
+namespace kudu {
+namespace flag_tags_internal {
+
+// Singleton registry storing the set of tags for each flag.
+class FlagTagRegistry {
+ public:
+  static FlagTagRegistry* GetInstance() {
+    return Singleton<FlagTagRegistry>::get();
+  }
+
+  void Tag(const string& name, const string& tag) {
+    tag_map_.insert(TagMap::value_type(name, tag));
+  }
+
+  void GetTags(const string& name, unordered_set<string>* tags) {
+    tags->clear();
+    pair<TagMap::const_iterator, TagMap::const_iterator> range =
+      tag_map_.equal_range(name);
+    for (auto it = range.first; it != range.second; ++it) {
+      if (!InsertIfNotPresent(tags, it->second)) {
+        LOG(DFATAL) << "Flag " << name << " was tagged more than once with the 
tag '"
+                    << it->second << "'";
+      }
+    }
+  }
+
+ private:
+  friend class Singleton<FlagTagRegistry>;
+  FlagTagRegistry() {}
+
+  typedef multimap<string, string> TagMap;
+  TagMap tag_map_;
+
+  DISALLOW_COPY_AND_ASSIGN(FlagTagRegistry);
+};
+
+
+FlagTagger::FlagTagger(const char* name, const char* tag) {
+  FlagTagRegistry::GetInstance()->Tag(name, tag);
+}
+
+FlagTagger::~FlagTagger() {
+}
+
+} // namespace flag_tags_internal
+
+using flag_tags_internal::FlagTagRegistry;
+
+void GetFlagTags(const string& flag_name,
+                 unordered_set<string>* tags) {
+  FlagTagRegistry::GetInstance()->GetTags(flag_name, tags);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/flag_tags.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/flag_tags.h b/be/src/kudu/util/flag_tags.h
new file mode 100644
index 0000000..ddfdab1
--- /dev/null
+++ b/be/src/kudu/util/flag_tags.h
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Flag Tags provide a way to attach arbitrary textual tags to gflags in
+// a global registry. Kudu uses the following flag tags:
+//
+// - "stable":
+//         These flags are considered user-facing APIs. Therefore, the
+//         semantics of the flag should not be changed except between major
+//         versions. Similarly, they must not be removed except between major
+//         versions.
+//
+// - "evolving":
+//         These flags are considered user-facing APIs, but are not yet
+//         locked down. For example, they may pertain to a newly introduced
+//         feature that is still being actively developed. These may be changed
+//         between minor versions, but should be suitably release-noted.
+//
+//         This is the default assumed stability level, but can be tagged
+//         if you'd like to make it explicit.
+//
+// - "experimental":
+//         These flags are considered user-facing APIs, but are related to
+//         an experimental feature, or otherwise likely to change or be
+//         removed at any point. Users should not expect any compatibility
+//         of these flags.
+//
+//         Users must pass --unlock_experimental_flags to use any of these
+//         flags.
+//
+// - "hidden":
+//         These flags are for internal use only (e.g. testing) and should
+//         not be included in user-facing documentation.
+//
+// - "advanced":
+//         These flags are for advanced users or debugging purposes. While
+//         they aren't likely to be actively harmful (see "unsafe" below),
+//         they're also likely to be used only rarely and should be relegated
+//         to more detailed sections of documentation.
+//
+// - "unsafe":
+//         These flags are for internal use only (e.g. testing), and changing
+//         them away from the defaults may result in arbitrarily bad things
+//         happening. These flags are automatically excluded from user-facing
+//         documentation even if they are not also marked 'hidden'.
+//
+//         Users must pass --unlock_unsafe_flags to use any of these
+//         flags.
+//
+// - "runtime":
+//         These flags can be safely changed at runtime via an RPC to the
+//         server. Changing a flag at runtime that does not have this tag is 
allowed
+//         only if the user specifies a "force_unsafe_change" flag in the RPC.
+//
+//         NOTE: because gflags are simple global variables, it's important to
+//         think very carefully before tagging a flag with 'runtime'. In 
particular,
+//         if a string-type flag is marked 'runtime', you should never access 
it
+//         using the raw 'FLAGS_foo_bar' name. Instead, you must use the
+//         google::GetCommandLineFlagInfo(...) API to make a copy of the flag 
value
+//         under a lock. Otherwise, the 'std::string' instance could be mutated
+//         underneath the reader causing a crash.
+//
+//         For primitive-type flags, we assume that reading a variable is 
atomic.
+//         That is to say that a reader will either see the old value or the 
new
+//         one, but not some invalid value. However, for the runtime change to
+//         have any effect, you must be sure to use the FLAGS_foo_bar variable 
directly
+//         rather than initializing some instance variable during program 
startup.
+//
+// - "sensitive":
+//         The values of these flags are considered sensitive and will be 
redacted
+//         if --redact is set with 'flag'.
+//
+// A given flag may have zero or more tags associated with it. The system does
+// not make any attempt to check integrity of the tags - for example, it allows
+// you to mark a flag as both stable and unstable, even though this makes no
+// real sense. Nevertheless, you should strive to meet the following 
requirements:
+//
+// - A flag should have exactly no more than one of 
stable/evolving/experimental
+//   indicating its stability. 'evolving' is considered the default.
+// - A flag should have no more than one of advanced/hidden indicating 
visibility
+//   in documentation. If neither is specified, the flag will be in the main
+//   section of the documentation.
+// - It is likely that most 'experimental' flags will also be 'advanced' or 
'hidden',
+//   and that 'stable' flags are not likely to be 'hidden' or 'unsafe'.
+//
+// To add a tag to a flag, use the TAG_FLAG macro. For example:
+//
+//  DEFINE_bool(sometimes_crash, false, "This flag makes Kudu crash a lot");
+//  TAG_FLAG(sometimes_crash, unsafe);
+//  TAG_FLAG(sometimes_crash, runtime);
+//
+// To fetch the list of tags associated with a flag, use 'GetFlagTags'.
+
+#ifndef KUDU_UTIL_FLAG_TAGS_H
+#define KUDU_UTIL_FLAG_TAGS_H
+
+#include "kudu/gutil/macros.h"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+namespace kudu {
+
+struct FlagTags {
+  enum {
+    stable,
+    evolving,
+    experimental,
+    hidden,
+    advanced,
+    unsafe,
+    runtime,
+    sensitive
+  };
+};
+
+// Tag the flag 'flag_name' with the given tag 'tag'.
+//
+// This verifies that 'flag_name' is a valid gflag, which must be defined
+// or declared above the use of the TAG_FLAG macro.
+//
+// This also validates that 'tag' is a valid flag as defined in the FlagTags
+// enum above.
+#define TAG_FLAG(flag_name, tag) \
+  COMPILE_ASSERT(sizeof(decltype(FLAGS_##flag_name)), flag_does_not_exist); \
+  COMPILE_ASSERT(sizeof(::kudu::FlagTags::tag), invalid_tag);   \
+  namespace {                                                     \
+    ::kudu::flag_tags_internal::FlagTagger t_##flag_name##_##tag( \
+        AS_STRING(flag_name), AS_STRING(tag));                    \
+  }
+
+// Fetch the list of flags associated with the given flag.
+//
+// If the flag is invalid or has no tags, sets 'tags' to be empty.
+void GetFlagTags(const std::string& flag_name,
+                 std::unordered_set<std::string>* tags);
+
+// ------------------------------------------------------------
+// Internal implementation details
+// ------------------------------------------------------------
+namespace flag_tags_internal {
+
+class FlagTagger {
+ public:
+  FlagTagger(const char* name, const char* tag);
+  ~FlagTagger();
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(FlagTagger);
+};
+
+} // namespace flag_tags_internal
+
+} // namespace kudu
+#endif /* KUDU_UTIL_FLAG_TAGS_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/flag_validators-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/flag_validators-test.cc 
b/be/src/kudu/util/flag_validators-test.cc
new file mode 100644
index 0000000..3e7fb5b
--- /dev/null
+++ b/be/src/kudu/util/flag_validators-test.cc
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/flag_validators.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_string(grouped_0, "", "First flag to set.");
+DEFINE_string(grouped_1, "", "Second flag to set.");
+DEFINE_string(grouped_2, "", "Third flag to set.");
+DEFINE_string(grouped_3, "", "Fourth flag to set.");
+
+namespace kudu {
+
+static bool CheckGroupedFlags01() {
+  const bool is_set_0 = !FLAGS_grouped_0.empty();
+  const bool is_set_1 = !FLAGS_grouped_1.empty();
+
+  if (is_set_0 != is_set_1) {
+    LOG(ERROR) << "--grouped_0 and --grouped_1 must be set as a group";
+    return false;
+  }
+
+  return true;
+}
+GROUP_FLAG_VALIDATOR(test_group_validator01, CheckGroupedFlags01)
+
+static bool CheckGroupedFlags23() {
+  const bool is_set_2 = !FLAGS_grouped_2.empty();
+  const bool is_set_3 = !FLAGS_grouped_3.empty();
+
+  if (is_set_2 != is_set_3) {
+    LOG(ERROR) << "--grouped_2 and --grouped_3 must be set as a group";
+    return false;
+  }
+
+  return true;
+}
+GROUP_FLAG_VALIDATOR(test_group_validator23, CheckGroupedFlags23)
+
+class FlagsValidatorsBasicTest : public KuduTest {
+ public:
+  void RunTest(const char** argv, int argc) {
+    char** casted_argv = const_cast<char**>(argv);
+    // ParseCommandLineFlags() calls exit(1) if it finds inconsistency in 
flags.
+    ASSERT_EQ(1, ParseCommandLineFlags(&argc, &casted_argv, true));
+  }
+};
+
+TEST_F(FlagsValidatorsBasicTest, Grouped) {
+  const auto& validators = GetFlagValidators();
+  ASSERT_EQ(2, validators.size());
+  const auto& it = validators.find("test_group_validator01");
+  ASSERT_NE(validators.end(), it);
+  const auto& validator = it->second;
+  EXPECT_TRUE(validator());
+  FLAGS_grouped_0 = "0";
+  EXPECT_FALSE(validator());
+  FLAGS_grouped_1 = "1";
+  EXPECT_TRUE(validator());
+  FLAGS_grouped_0 = "";
+  EXPECT_FALSE(validator());
+  FLAGS_grouped_1 = "";
+  EXPECT_TRUE(validator());
+}
+
+class FlagsValidatorsDeathTest : public KuduTest {
+ public:
+  void Run(const char** argv, int argc) {
+    debug::ScopedLeakCheckDisabler disabler;
+    char** casted_argv = const_cast<char**>(argv);
+    // ParseCommandLineFlags() calls exit(1) if one of the custom validators
+    // finds inconsistency in flags.
+    ParseCommandLineFlags(&argc, &casted_argv, true);
+    exit(0);
+  }
+
+  void RunSuccess(const char** argv, int argc) {
+    EXPECT_EXIT(Run(argv, argc), ::testing::ExitedWithCode(0), ".*");
+  }
+
+  void RunFailure(const char** argv, int argc) {
+    EXPECT_EXIT(Run(argv, argc), ::testing::ExitedWithCode(1),
+        ".* Detected inconsistency in command-line flags; exiting");
+  }
+};
+
+TEST_F(FlagsValidatorsDeathTest, GroupedSuccessNoFlags) {
+  const char* argv[] = { "argv_set_0" };
+  NO_FATALS(RunSuccess(argv, ARRAYSIZE(argv)));
+}
+
+TEST_F(FlagsValidatorsDeathTest, GroupedSuccessSimple) {
+  static const size_t kArgvSize = 1 + 2;
+  const char* argv_sets[][kArgvSize] = {
+    {
+      "argv_set_0",
+      "--grouped_0=first",
+      "--grouped_1=second",
+    },
+    {
+      "argv_set_1",
+      "--grouped_0=second",
+      "--grouped_1=first",
+    },
+    {
+      "argv_set_2",
+      "--grouped_0=",
+      "--grouped_1=",
+    },
+    {
+      "argv_set_3",
+      "--grouped_1=",
+      "--grouped_0=",
+    },
+    {
+      "argv_set_4",
+      "--grouped_2=2",
+      "--grouped_3=3",
+    },
+    {
+      "argv_set_5",
+      "--grouped_3=",
+      "--grouped_2=",
+    },
+  };
+  for (auto argv : argv_sets) {
+    RunSuccess(argv, kArgvSize);
+  }
+}
+
+TEST_F(FlagsValidatorsDeathTest, GroupedFailureSimple) {
+  static const size_t kArgvSize = 1 + 1;
+  const char* argv_sets[][kArgvSize] = {
+    {
+      "argv_set_0",
+      "--grouped_0=a",
+    },
+    {
+      "argv_set_1",
+      "--grouped_1=b",
+    },
+    {
+      "argv_set_2",
+      "--grouped_2=2",
+    },
+    {
+      "argv_set_3",
+      "--grouped_3=3",
+    },
+  };
+  for (auto argv : argv_sets) {
+    RunFailure(argv, kArgvSize);
+  }
+}
+
+// Test for correct behavior when only one of two group validators is failing.
+TEST_F(FlagsValidatorsDeathTest, GroupedFailureOneOfTwoValidators) {
+  static const size_t kArgvSize = 4 + 1;
+  const char* argv_sets[][kArgvSize] = {
+    {
+      "argv_set_0",
+      "--grouped_0=0",
+      "--grouped_1=1",
+      "--grouped_2=",
+      "--grouped_3=3",
+    },
+    {
+      "argv_set_1",
+      "--grouped_2=",
+      "--grouped_3=3",
+      "--grouped_0=0",
+      "--grouped_1=1",
+    },
+    {
+      "argv_set_2",
+      "--grouped_0=0",
+      "--grouped_1=",
+      "--grouped_2=2",
+      "--grouped_3=3",
+    },
+    {
+      "argv_set_3",
+      "--grouped_3=3",
+      "--grouped_2=2",
+      "--grouped_1=1",
+      "--grouped_0=",
+    },
+  };
+  for (auto argv : argv_sets) {
+    RunFailure(argv, kArgvSize);
+  }
+}
+
+TEST_F(FlagsValidatorsDeathTest, GroupedFailureWithEmptyValues) {
+  static const size_t kArgvSize = 1 + 2;
+  const char* argv_sets[][kArgvSize] = {
+    {
+      "argv_set_0",
+      "--grouped_0=a",
+      "--grouped_1=",
+    },
+    {
+      "argv_set_1",
+      "--grouped_1=",
+      "--grouped_0=a",
+    },
+    {
+      "argv_set_2",
+      "--grouped_0=",
+      "--grouped_1=b",
+    },
+    {
+      "argv_set_3",
+      "--grouped_1=b",
+      "--grouped_0=",
+    },
+  };
+  for (auto argv : argv_sets) {
+    RunFailure(argv, kArgvSize);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/flag_validators.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/flag_validators.cc 
b/be/src/kudu/util/flag_validators.cc
new file mode 100644
index 0000000..f90fe2e
--- /dev/null
+++ b/be/src/kudu/util/flag_validators.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/flag_validators.h"
+
+#include <string>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/singleton.h"
+
+using std::string;
+
+namespace kudu {
+namespace flag_validation_internal {
+
+// A singleton registry for storing group flag validators.
+class FlagValidatorRegistry {
+ public:
+  static FlagValidatorRegistry* GetInstance() {
+    return Singleton<FlagValidatorRegistry>::get();
+  }
+
+  void Register(const string& name, const FlagValidator& func) {
+    InsertOrDie(&validators_, name, func);
+  }
+
+  const FlagValidatorsMap& validators() {
+    return validators_;
+  }
+
+ private:
+  friend class Singleton<FlagValidatorRegistry>;
+  FlagValidatorRegistry() {}
+
+  FlagValidatorsMap validators_;
+
+  DISALLOW_COPY_AND_ASSIGN(FlagValidatorRegistry);
+};
+
+
+Registrator::Registrator(const char* name, const FlagValidator& validator) {
+  FlagValidatorRegistry::GetInstance()->Register(name, validator);
+}
+
+} // namespace flag_validation_internal
+
+
+const FlagValidatorsMap& GetFlagValidators() {
+  using flag_validation_internal::FlagValidatorRegistry;
+  return FlagValidatorRegistry::GetInstance()->validators();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/flag_validators.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/flag_validators.h 
b/be/src/kudu/util/flag_validators.h
new file mode 100644
index 0000000..02cc2dd
--- /dev/null
+++ b/be/src/kudu/util/flag_validators.h
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+#include <functional>
+#include <map>
+#include <string>
+
+namespace kudu {
+
+// The validation function: takes no parameters and returns a boolean. A group
+// validator should return 'true' if validation was successful, or 'false'
+// otherwise.
+typedef std::function<bool(void)> FlagValidator;
+
+// The group validator registry's representation for as seen from the outside:
+// the key is the name of the group validator, the value is the validation
+// function.
+typedef std::map<std::string, FlagValidator> FlagValidatorsMap;
+
+// Register a 'group' validator for command-line flags. In contrast with the
+// standard (built-in) gflag validators registered by the DEFINE_validator()
+// macro, group validators are run at a later phase in the context of the 
main()
+// function. A group validator has a guarantee that all command-line flags have
+// been parsed, individually validated (via standard validators), and their
+// values are already set at the time when the validator runs.
+//
+// The first macro parameter is the name of the validator, the second parameter
+// is the validation function as is. The name must be unique across all
+// registered group validators.
+//
+// The validation function takes no parameters and returns 'true' in case of
+// successful validation, otherwise it returns 'false'. If at least one of the
+// registered group validators returns 'false', exit(1) is called.
+//
+// Usage guideline:
+//
+//   * Use the DEFINE_validator() macro if you need to validate an individual
+//     gflag's value
+//
+//   * Use the GROUP_FLAG_VALIDATOR() macro only if you need to validate a set
+//     of gflag values against one another, having the guarantee that their
+//     values are already set when the validation function runs.
+//
+// Sample usage:
+//
+//  static bool ValidateGroupedFlags() {
+//    bool has_a = !FLAGS_a.empty();
+//    bool has_b = !FLAGS_b.empty();
+//
+//    if (has_a != has_b) {
+//      LOG(ERROR) << "--a and --b must be set as a group";
+//      return false;
+//    }
+//
+//    return true;
+//  }
+//  GROUP_FLAG_VALIDATOR(grouped_flags_validator, ValidateGroupedFlags);
+//
+#define GROUP_FLAG_VALIDATOR(name, func) \
+  namespace {                                               \
+    ::kudu::flag_validation_internal::Registrator v_##name( \
+        AS_STRING(name), (func));                           \
+  }
+
+// Get all registered group flag validators.
+const FlagValidatorsMap& GetFlagValidators();
+
+namespace flag_validation_internal {
+
+// This is a utility class which registers a group validator upon 
instantiation.
+class Registrator {
+ public:
+  // The constructor registers a group validator with the specified name and
+  // the given validation function. The name must be unique among all group
+  // validators.
+  Registrator(const char* name, const FlagValidator& validator);
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Registrator);
+};
+
+} // namespace flag_validation_internal
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/flags-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/flags-test.cc b/be/src/kudu/util/flags-test.cc
new file mode 100644
index 0000000..dd5acd0
--- /dev/null
+++ b/be/src/kudu/util/flags-test.cc
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <string>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/test_util.h"
+
+// Test gflags
+DEFINE_string(test_nondefault_ff, "default",
+             "Check if we track non defaults from flagfile");
+DEFINE_string(test_nondefault_explicit, "default",
+             "Check if we track explicitly set non defaults");
+DEFINE_string(test_default_ff, "default",
+             "Check if we track defaults from flagfile");
+DEFINE_string(test_default_explicit, "default",
+             "Check if we track explicitly set defaults");
+DEFINE_bool(test_sensitive_flag, false, "a sensitive flag");
+TAG_FLAG(test_sensitive_flag, sensitive);
+
+DECLARE_bool(never_fsync);
+
+namespace kudu {
+
+class FlagsTest : public KuduTest {};
+
+TEST_F(FlagsTest, TestNonDefaultFlags) {
+  // Memorize the default flags
+  GFlagsMap default_flags = GetFlagsMap();
+
+  std::string flagfile_path(GetTestPath("test_nondefault_flags"));
+  std::string flagfile_contents = "--test_nondefault_ff=nondefault\n"
+                                  "--test_default_ff=default";
+
+  CHECK_OK(WriteStringToFile(Env::Default(),
+                             Slice(flagfile_contents.data(),
+                                   flagfile_contents.size()),
+                             flagfile_path));
+
+  std::string flagfile_flag = strings::Substitute("--flagfile=$0", 
flagfile_path);
+  int argc = 4;
+  const char* argv[4] = {
+    "some_executable_file",
+    "--test_nondefault_explicit=nondefault",
+    "--test_default_explicit=default",
+    flagfile_flag.c_str()
+  };
+
+  char** casted_argv = const_cast<char**>(argv);
+  ParseCommandLineFlags(&argc, &casted_argv, true);
+
+  std::vector<const char*> expected_flags = {
+    "--test_nondefault_explicit=nondefault",
+    "--test_nondefault_ff=nondefault",
+    flagfile_flag.c_str()
+  };
+
+  std::vector<const char*> unexpected_flags = {
+    "--test_default_explicit",
+    "--test_default_ff"
+  };
+
+  // Setting a sensitive flag with non-default value should return
+  // a redacted value.
+  FLAGS_test_sensitive_flag = true;
+  std::string result = GetNonDefaultFlags(default_flags);
+
+  for (const auto& expected : expected_flags) {
+    ASSERT_STR_CONTAINS(result, expected);
+  }
+
+  for (const auto& unexpected : unexpected_flags) {
+    ASSERT_STR_NOT_CONTAINS(result, unexpected);
+  }
+
+  ASSERT_STR_CONTAINS(result, strings::Substitute("--test_sensitive_flag=$0",
+                                                  kRedactionMessage));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/flags.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/flags.cc b/be/src/kudu/util/flags.cc
new file mode 100644
index 0000000..6b57f54
--- /dev/null
+++ b/be/src/kudu/util/flags.cc
@@ -0,0 +1,555 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/flags.h"
+
+#include <iostream>
+#include <map>
+#include <sstream>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include <gflags/gflags.h>
+#include <gperftools/heap-profiler.h>
+
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/os-util.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/string_case.h"
+#include "kudu/util/url-coding.h"
+#include "kudu/util/version_info.h"
+
+using google::CommandLineFlagInfo;
+
+using std::cout;
+using std::endl;
+using std::string;
+using std::stringstream;
+using std::unordered_set;
+
+using strings::Substitute;
+
+// Because every binary initializes its flags here, we use it as a convenient 
place
+// to offer some global flags as well.
+DEFINE_bool(dump_metrics_json, false,
+            "Dump a JSON document describing all of the metrics which may be 
emitted "
+            "by this binary.");
+TAG_FLAG(dump_metrics_json, hidden);
+
+DEFINE_bool(enable_process_lifetime_heap_profiling, false, "Enables heap "
+    "profiling for the lifetime of the process. Profile output will be stored 
in the "
+    "directory specified by -heap_profile_path. Enabling this option will 
disable the "
+    "on-demand/remote server profile handlers.");
+TAG_FLAG(enable_process_lifetime_heap_profiling, stable);
+TAG_FLAG(enable_process_lifetime_heap_profiling, advanced);
+
+DEFINE_string(heap_profile_path, "", "Output path to store heap profiles. If 
not set " \
+    "profiles are stored in /tmp/<process-name>.<pid>.<n>.heap.");
+TAG_FLAG(heap_profile_path, stable);
+TAG_FLAG(heap_profile_path, advanced);
+
+DEFINE_bool(disable_core_dumps, false, "Disable core dumps when this process 
crashes.");
+TAG_FLAG(disable_core_dumps, advanced);
+TAG_FLAG(disable_core_dumps, evolving);
+
+DEFINE_string(umask, "077",
+              "The umask that will be used when creating files and 
directories. "
+              "Permissions of top-level data directories will also be modified 
at "
+              "start-up to conform to the given umask. Changing this value may 
"
+              "enable unauthorized local users to read or modify data stored 
by Kudu.");
+TAG_FLAG(umask, advanced);
+
+static bool ValidateUmask(const char* /*flagname*/, const string& value) {
+  uint32_t parsed;
+  if (!safe_strtou32_base(value.c_str(), &parsed, 8)) {
+    LOG(ERROR) << "Invalid umask: must be an octal string";
+    return false;
+  }
+
+  // Verify that the umask doesn't restrict the permissions of the owner.
+  // If it did, we'd end up creating files that we can't read.
+  if ((parsed & 0700) != 0) {
+    LOG(ERROR) << "Invalid umask value: must not restrict owner permissions";
+    return false;
+  }
+  return true;
+}
+
+DEFINE_validator(umask, &ValidateUmask);
+
+DEFINE_bool(unlock_experimental_flags, false,
+            "Unlock flags marked as 'experimental'. These flags are not 
guaranteed to "
+            "be maintained across releases of Kudu, and may enable features or 
behavior "
+            "known to be unstable. Use at your own risk.");
+TAG_FLAG(unlock_experimental_flags, advanced);
+TAG_FLAG(unlock_experimental_flags, stable);
+
+DEFINE_bool(unlock_unsafe_flags, false,
+            "Unlock flags marked as 'unsafe'. These flags are not guaranteed 
to "
+            "be maintained across releases of Kudu, and enable features or 
behavior "
+            "known to be unsafe. Use at your own risk.");
+TAG_FLAG(unlock_unsafe_flags, advanced);
+TAG_FLAG(unlock_unsafe_flags, stable);
+
+DEFINE_string(redact, "all",
+              "Comma-separated list of redactions. Supported options are 
'flag', "
+              "'log', 'all', and 'none'. If 'flag' is specified, configuration 
flags which may "
+              "include sensitive data will be redacted whenever server 
configuration "
+              "is emitted. If 'log' is specified, row data will be redacted 
from log "
+              "and error messages. If 'all' is specified, all of above will be 
redacted. "
+              "If 'none' is specified, no redaction will occur.");
+TAG_FLAG(redact, advanced);
+TAG_FLAG(redact, evolving);
+
+static bool ValidateRedact(const char* /*flagname*/, const string& value) {
+  kudu::g_should_redact_log = false;
+  kudu::g_should_redact_flag = false;
+
+  // Flag value is case insensitive.
+  string redact_flags;
+  kudu::ToUpperCase(value, &redact_flags);
+
+  // 'all', 'none', and '' must be specified without any other option.
+  if (redact_flags == "ALL") {
+    kudu::g_should_redact_log = true;
+    kudu::g_should_redact_flag = true;
+    return true;
+  }
+  if (redact_flags == "NONE" || redact_flags.empty()) {
+    return true;
+  }
+
+  for (const auto& t : strings::Split(redact_flags, ",", 
strings::SkipEmpty())) {
+    if (t == "LOG") {
+      kudu::g_should_redact_log = true;
+    } else if (t == "FLAG") {
+      kudu::g_should_redact_flag = true;
+    } else if (t == "ALL" || t == "NONE") {
+      LOG(ERROR) << "Invalid redaction options: "
+                 << value << ", '" << t << "' must be specified by itself.";
+      return false;
+    } else {
+      LOG(ERROR) << "Invalid redaction type: " << t <<
+                    ". Available types are 'flag', 'log', 'all', and 'none'.";
+      return false;
+    }
+  }
+  return true;
+}
+
+DEFINE_validator(redact, &ValidateRedact);
+// Tag a bunch of the flags that we inherit from glog/gflags.
+
+//------------------------------------------------------------
+// GLog flags
+//------------------------------------------------------------
+// Most of these are considered stable. The ones related to email are
+// marked unsafe because sending email inline from a server is a pretty
+// bad idea.
+DECLARE_string(alsologtoemail);
+TAG_FLAG(alsologtoemail, hidden);
+TAG_FLAG(alsologtoemail, unsafe);
+
+// --alsologtostderr is deprecated in favor of --stderrthreshold
+DECLARE_bool(alsologtostderr);
+TAG_FLAG(alsologtostderr, hidden);
+TAG_FLAG(alsologtostderr, runtime);
+
+DECLARE_bool(colorlogtostderr);
+TAG_FLAG(colorlogtostderr, stable);
+TAG_FLAG(colorlogtostderr, runtime);
+
+DECLARE_bool(drop_log_memory);
+TAG_FLAG(drop_log_memory, advanced);
+TAG_FLAG(drop_log_memory, runtime);
+
+DECLARE_string(log_backtrace_at);
+TAG_FLAG(log_backtrace_at, advanced);
+
+DECLARE_string(log_dir);
+TAG_FLAG(log_dir, stable);
+
+DECLARE_string(log_link);
+TAG_FLAG(log_link, stable);
+TAG_FLAG(log_link, advanced);
+
+DECLARE_bool(log_prefix);
+TAG_FLAG(log_prefix, stable);
+TAG_FLAG(log_prefix, advanced);
+TAG_FLAG(log_prefix, runtime);
+
+DECLARE_int32(logbuflevel);
+TAG_FLAG(logbuflevel, advanced);
+TAG_FLAG(logbuflevel, runtime);
+DECLARE_int32(logbufsecs);
+TAG_FLAG(logbufsecs, advanced);
+TAG_FLAG(logbufsecs, runtime);
+
+DECLARE_int32(logemaillevel);
+TAG_FLAG(logemaillevel, hidden);
+TAG_FLAG(logemaillevel, unsafe);
+
+DECLARE_string(logmailer);
+TAG_FLAG(logmailer, hidden);
+
+DECLARE_bool(logtostderr);
+TAG_FLAG(logtostderr, stable);
+TAG_FLAG(logtostderr, runtime);
+
+DECLARE_int32(max_log_size);
+TAG_FLAG(max_log_size, stable);
+TAG_FLAG(max_log_size, runtime);
+
+DECLARE_int32(minloglevel);
+TAG_FLAG(minloglevel, stable);
+TAG_FLAG(minloglevel, advanced);
+TAG_FLAG(minloglevel, runtime);
+
+DECLARE_int32(stderrthreshold);
+TAG_FLAG(stderrthreshold, stable);
+TAG_FLAG(stderrthreshold, advanced);
+TAG_FLAG(stderrthreshold, runtime);
+
+DECLARE_bool(stop_logging_if_full_disk);
+TAG_FLAG(stop_logging_if_full_disk, stable);
+TAG_FLAG(stop_logging_if_full_disk, advanced);
+TAG_FLAG(stop_logging_if_full_disk, runtime);
+
+DECLARE_int32(v);
+TAG_FLAG(v, stable);
+TAG_FLAG(v, advanced);
+TAG_FLAG(v, runtime);
+
+DECLARE_string(vmodule);
+TAG_FLAG(vmodule, stable);
+TAG_FLAG(vmodule, advanced);
+
+DECLARE_bool(symbolize_stacktrace);
+TAG_FLAG(symbolize_stacktrace, stable);
+TAG_FLAG(symbolize_stacktrace, runtime);
+TAG_FLAG(symbolize_stacktrace, advanced);
+
+//------------------------------------------------------------
+// GFlags flags
+//------------------------------------------------------------
+DECLARE_string(flagfile);
+TAG_FLAG(flagfile, stable);
+
+DECLARE_string(fromenv);
+TAG_FLAG(fromenv, stable);
+TAG_FLAG(fromenv, advanced);
+
+DECLARE_string(tryfromenv);
+TAG_FLAG(tryfromenv, stable);
+TAG_FLAG(tryfromenv, advanced);
+
+DECLARE_string(undefok);
+TAG_FLAG(undefok, stable);
+TAG_FLAG(undefok, advanced);
+
+DECLARE_int32(tab_completion_columns);
+TAG_FLAG(tab_completion_columns, stable);
+TAG_FLAG(tab_completion_columns, hidden);
+
+DECLARE_string(tab_completion_word);
+TAG_FLAG(tab_completion_word, stable);
+TAG_FLAG(tab_completion_word, hidden);
+
+DECLARE_bool(help);
+TAG_FLAG(help, stable);
+
+DECLARE_bool(helpfull);
+// We hide -helpfull because it's the same as -help for now.
+TAG_FLAG(helpfull, stable);
+TAG_FLAG(helpfull, hidden);
+
+DECLARE_string(helpmatch);
+TAG_FLAG(helpmatch, stable);
+TAG_FLAG(helpmatch, advanced);
+
+DECLARE_string(helpon);
+TAG_FLAG(helpon, stable);
+TAG_FLAG(helpon, advanced);
+
+DECLARE_bool(helppackage);
+TAG_FLAG(helppackage, stable);
+TAG_FLAG(helppackage, advanced);
+
+DECLARE_bool(helpshort);
+TAG_FLAG(helpshort, stable);
+TAG_FLAG(helpshort, advanced);
+
+DECLARE_bool(helpxml);
+TAG_FLAG(helpxml, stable);
+TAG_FLAG(helpxml, advanced);
+
+DECLARE_bool(version);
+TAG_FLAG(version, stable);
+
+namespace kudu {
+
+// After flags have been parsed, the umask value is filled in here.
+uint32_t g_parsed_umask = -1;
+
+namespace {
+
+void AppendXMLTag(const char* tag, const string& txt, string* r) {
+  strings::SubstituteAndAppend(r, "<$0>$1</$0>", tag, 
EscapeForHtmlToString(txt));
+}
+
+static string DescribeOneFlagInXML(const CommandLineFlagInfo& flag) {
+  unordered_set<string> tags;
+  GetFlagTags(flag.name, &tags);
+
+  string r("<flag>");
+  AppendXMLTag("file", flag.filename, &r);
+  AppendXMLTag("name", flag.name, &r);
+  AppendXMLTag("meaning", flag.description, &r);
+  AppendXMLTag("default", flag.default_value, &r);
+  AppendXMLTag("current", flag.current_value, &r);
+  AppendXMLTag("type", flag.type, &r);
+  AppendXMLTag("tags", JoinStrings(tags, ","), &r);
+  r += "</flag>";
+  return r;
+}
+
+void DumpFlagsXML() {
+  vector<CommandLineFlagInfo> flags;
+  GetAllFlags(&flags);
+
+  cout << "<?xml version=\"1.0\"?>" << endl;
+  cout << "<AllFlags>" << endl;
+  cout << strings::Substitute(
+      "<program>$0</program>",
+      EscapeForHtmlToString(BaseName(google::ProgramInvocationShortName()))) 
<< endl;
+  cout << strings::Substitute(
+      "<usage>$0</usage>",
+      EscapeForHtmlToString(google::ProgramUsage())) << endl;
+
+  for (const CommandLineFlagInfo& flag : flags) {
+    cout << DescribeOneFlagInXML(flag) << std::endl;
+  }
+
+  cout << "</AllFlags>" << endl;
+  exit(1);
+}
+
+void ShowVersionAndExit() {
+  cout << VersionInfo::GetAllVersionInfo() << endl;
+  exit(0);
+}
+
+// Check that, if any flags tagged with 'tag' have been specified to
+// non-default values, that 'unlocked' is true. If so (i.e. if the
+// flags have been appropriately unlocked), emits a warning message
+// for each flag and returns false. Otherwise, emits an error message
+// and returns true.
+bool CheckFlagsAndWarn(const string& tag, bool unlocked) {
+  vector<CommandLineFlagInfo> flags;
+  GetAllFlags(&flags);
+
+  int use_count = 0;
+  for (const auto& f : flags) {
+    if (f.is_default) continue;
+    unordered_set<string> tags;
+    GetFlagTags(f.name, &tags);
+    if (!ContainsKey(tags, tag)) continue;
+
+    if (unlocked) {
+      LOG(WARNING) << "Enabled " << tag << " flag: --" << f.name << "=" << 
f.current_value;
+    } else {
+      LOG(ERROR) << "Flag --" << f.name << " is " << tag << " and 
unsupported.";
+      use_count++;
+    }
+  }
+
+  if (!unlocked && use_count > 0) {
+    LOG(ERROR) << use_count << " " << tag << " flag(s) in use.";
+    LOG(ERROR) << "Use --unlock_" << tag << "_flags to proceed at your own 
risk.";
+    return true;
+  }
+  return false;
+}
+
+// Check that any flags specified on the command line are allowed
+// to be set. This ensures that, if the user is using any unsafe
+// or experimental flags, they have explicitly unlocked them.
+void CheckFlagsAllowed() {
+  bool should_exit = false;
+  should_exit |= CheckFlagsAndWarn("unsafe", FLAGS_unlock_unsafe_flags);
+  should_exit |= CheckFlagsAndWarn("experimental", 
FLAGS_unlock_experimental_flags);
+  if (should_exit) {
+    exit(1);
+  }
+}
+
+// Run 'late phase' custom validators: these can be run only when all flags are
+// already parsed and individually validated.
+void RunCustomValidators() {
+  const auto& validators(GetFlagValidators());
+  bool found_inconsistency = false;
+  for (const auto& e : validators) {
+    found_inconsistency |= !e.second();
+  }
+  if (found_inconsistency) {
+    LOG(ERROR) << "Detected inconsistency in command-line flags; exiting";
+    exit(1);
+  }
+}
+
+// Redact the flag tagged as 'sensitive', if --redact is set
+// with 'flag'. Otherwise, return its value as-is. If EscapeMode
+// is set to HTML, return HTML escaped string.
+string CheckFlagAndRedact(const CommandLineFlagInfo& flag, EscapeMode mode) {
+  string ret_value;
+  unordered_set<string> tags;
+  GetFlagTags(flag.name, &tags);
+
+  if (ContainsKey(tags, "sensitive") && g_should_redact_flag) {
+    ret_value = kRedactionMessage;
+  } else {
+    ret_value = flag.current_value;
+  }
+  if (mode == EscapeMode::HTML) {
+    ret_value = EscapeForHtmlToString(ret_value);
+  }
+  return ret_value;
+}
+
+void SetUmask() {
+  // We already validated with a nice error message using the ValidateUmask
+  // FlagValidator above.
+  CHECK(safe_strtou32_base(FLAGS_umask.c_str(), &g_parsed_umask, 8));
+  uint32_t old_mask = umask(g_parsed_umask);
+  if (old_mask != g_parsed_umask) {
+    VLOG(2) << "Changed umask from " << StringPrintf("%03o", old_mask) << " to 
"
+            << StringPrintf("%03o", g_parsed_umask);
+  }
+}
+
+} // anonymous namespace
+
+int ParseCommandLineFlags(int* argc, char*** argv, bool remove_flags) {
+  // The logbufsecs default is 30 seconds which is a bit too long.
+  google::SetCommandLineOptionWithMode("logbufsecs", "5",
+                                       
google::FlagSettingMode::SET_FLAGS_DEFAULT);
+
+  int ret = google::ParseCommandLineNonHelpFlags(argc, argv, remove_flags);
+  HandleCommonFlags();
+  return ret;
+}
+
+void HandleCommonFlags() {
+  CheckFlagsAllowed();
+  RunCustomValidators();
+
+  if (FLAGS_helpxml) {
+    DumpFlagsXML();
+  } else if (FLAGS_dump_metrics_json) {
+    MetricPrototypeRegistry::get()->WriteAsJsonAndExit();
+  } else if (FLAGS_version) {
+    ShowVersionAndExit();
+  } else {
+    google::HandleCommandLineHelpFlags();
+  }
+
+  if (FLAGS_heap_profile_path.empty()) {
+    FLAGS_heap_profile_path = strings::Substitute(
+        "/tmp/$0.$1", google::ProgramInvocationShortName(), getpid());
+  }
+
+  if (FLAGS_disable_core_dumps) {
+    DisableCoreDumps();
+  }
+
+  SetUmask();
+
+#ifdef TCMALLOC_ENABLED
+  if (FLAGS_enable_process_lifetime_heap_profiling) {
+    HeapProfilerStart(FLAGS_heap_profile_path.c_str());
+  }
+#endif
+}
+
+string CommandlineFlagsIntoString(EscapeMode mode) {
+  string ret_value;
+  vector<CommandLineFlagInfo> flags;
+  GetAllFlags(&flags);
+
+  for (const auto& f : flags) {
+    ret_value += "--";
+    if (mode == EscapeMode::HTML) {
+      ret_value += EscapeForHtmlToString(f.name);
+    } else if (mode == EscapeMode::NONE) {
+      ret_value += f.name;
+    }
+    ret_value += "=";
+    ret_value += CheckFlagAndRedact(f, mode);
+    ret_value += "\n";
+  }
+  return ret_value;
+}
+
+string GetNonDefaultFlags(const GFlagsMap& default_flags) {
+  stringstream args;
+  vector<CommandLineFlagInfo> flags;
+  GetAllFlags(&flags);
+  for (const auto& flag : flags) {
+    if (!flag.is_default) {
+      // This only means that the flag has been rewritten. It doesn't
+      // mean that this has been done in the command line, or even
+      // that it's truly different from the default value.
+      // Next, we try to check both.
+      const auto& default_flag = default_flags.find(flag.name);
+      // it's very unlikely, but still possible that we don't have the flag in 
defaults
+      if (default_flag == default_flags.end() ||
+          flag.current_value != default_flag->second.current_value) {
+        if (!args.str().empty()) {
+          args << '\n';
+        }
+
+        // Redact the flags tagged as sensitive, if --redact is set
+        // with 'flag'.
+        string flag_value = CheckFlagAndRedact(flag, EscapeMode::NONE);
+        args << "--" << flag.name << '=' << flag_value;
+      }
+    }
+  }
+  return args.str();
+}
+
+GFlagsMap GetFlagsMap() {
+  vector<CommandLineFlagInfo> default_flags;
+  GetAllFlags(&default_flags);
+  GFlagsMap flags_by_name;
+  for (auto& flag : default_flags) {
+    flags_by_name.emplace(flag.name, std::move(flag));
+  }
+  return flags_by_name;
+}
+
+} // namespace kudu


Reply via email to