This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new f49a548fe KUDU-613: Introduce SLRU cache
f49a548fe is described below

commit f49a548fe4bac91679744b4dd0efa895ff9c09e2
Author: Mahesh Reddy <[email protected]>
AuthorDate: Fri Oct 20 16:24:45 2023 -0400

    KUDU-613: Introduce SLRU cache
    
    This patch introduces the SLRU cache that has two internal segments,
    the probationary and protected, to protect the cache from long/random
    reads. The SLRU cache has a parameter named 'lookups_threshold_' that
    determines the minimum amount of times an entry can be accessed
    before it's upgraded to the protected segment. Any random scans would
    then only evict entries from the probationary segment.
    
    Both the protected and probationary segment have their own
    configurable capacities. When the protected segment is at capacity,
    any entries evicted will be added to the MRU end of the
    probationary segment.
    
    Metrics will be added in a follow-up patch.
    
    Ran cache benchmarks for RELEASE build.
    Used default flag values in cache-bench.
    Build ran locally on macOS, 6 cores and 2.6GHz.
    
    Here are some benchmark numbers for SLRU cache:
    Test case           | Algorithm | Lookups/sec  | Hit rate
    ZIPFIAN ratio=1.00x | LRU       | 11.01M       | 99.8%
    ZIPFIAN ratio=1.00x | SLRU      | 10.62M       | 99.9%
    ZIPFIAN ratio=3.00x | LRU       | 11.06M       | 95.9%
    ZIPFIAN ratio=3.00x | SLRU      | 9.69M        | 95.9%
    UNIFORM ratio=1.00x | LRU       | 8.54M        | 99.7%
    UNIFORM ratio=1.00x | SLRU      | 6.18M        | 99.7%
    UNIFORM ratio=3.00x | LRU       | 6.53M        | 33.3%
    UNIFORM ratio=3.00x | SLRU      | 4.99M        | 33.3%
    
    Change-Id: I45531534a2049dd38c002f4dc7982df9fd46e0bb
    Reviewed-on: http://gerrit.cloudera.org:8080/20607
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/util/CMakeLists.txt     |   2 +
 src/kudu/util/cache-bench.cc     |  37 ++-
 src/kudu/util/cache.cc           |   2 +
 src/kudu/util/cache.h            |   3 +
 src/kudu/util/slru_cache-test.cc | 679 +++++++++++++++++++++++++++++++++++++++
 src/kudu/util/slru_cache.cc      | 488 ++++++++++++++++++++++++++++
 src/kudu/util/slru_cache.h       | 280 ++++++++++++++++
 7 files changed, 1486 insertions(+), 5 deletions(-)

diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index d32450c97..7bd37c5ea 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -244,6 +244,7 @@ set(UTIL_SRCS
   ${SEMAPHORE_CC}
   signal.cc
   slice.cc
+  slru_cache.cc
   spinlock_profiling.cc
   status.cc
   status_callback.cc
@@ -574,6 +575,7 @@ ADD_KUDU_TEST(rwc_lock-test RUN_SERIAL true)
 ADD_KUDU_TEST(safe_math-test)
 ADD_KUDU_TEST(scoped_cleanup-test)
 ADD_KUDU_TEST(slice-test)
+ADD_KUDU_TEST(slru_cache-test)
 ADD_KUDU_TEST(sorted_disjoint_interval_list-test)
 ADD_KUDU_TEST(spinlock_profiling-test)
 ADD_KUDU_TEST(stack_watchdog-test PROCESSORS 2)
diff --git a/src/kudu/util/cache-bench.cc b/src/kudu/util/cache-bench.cc
index 64016b247..5c11689d9 100644
--- a/src/kudu/util/cache-bench.cc
+++ b/src/kudu/util/cache-bench.cc
@@ -38,6 +38,7 @@
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/slice.h"
+#include "kudu/util/slru_cache.h"
 #include "kudu/util/test_util.h"
 
 DEFINE_int32(num_threads, 16, "The number of threads to access the cache 
concurrently.");
@@ -54,6 +55,10 @@ namespace kudu {
 
 // Benchmark a 1GB cache.
 static constexpr int kCacheCapacity = 1024 * 1024 * 1024;
+static constexpr int kProbationarySegmentCapacity = 820 * 1024 * 1024;
+static constexpr int kProtectedSegmentCapacity = 204 * 1024 * 1024;
+static constexpr uint32_t kLookups = 3;
+
 // Use 4kb entries.
 static constexpr int kEntrySize = 4 * 1024;
 
@@ -74,18 +79,30 @@ struct BenchSetup {
   // in the cache.
   double dataset_cache_ratio;
 
+  Cache::EvictionPolicy eviction_policy;
+
   string ToString() const {
     string ret;
     switch (pattern) {
       case Pattern::ZIPFIAN: ret += "ZIPFIAN"; break;
       case Pattern::UNIFORM: ret += "UNIFORM"; break;
     }
+    if (eviction_policy == Cache::EvictionPolicy::SLRU) {
+      ret += " SLRU";
+    } else {
+      ret += " LRU";
+    }
     ret += StringPrintf(" ratio=%.2fx n_unique=%d", dataset_cache_ratio, 
max_key());
     return ret;
   }
 
   // Return the maximum cache key to be generated for a lookup.
   uint32_t max_key() const {
+    if (eviction_policy == Cache::EvictionPolicy::SLRU) {
+      return static_cast<int64_t>(
+          (kProbationarySegmentCapacity + kProtectedSegmentCapacity) * 
dataset_cache_ratio)
+          / kEntrySize;
+    }
     return static_cast<int64_t>(kCacheCapacity * dataset_cache_ratio) / 
kEntrySize;
   }
 };
@@ -95,7 +112,13 @@ class CacheBench : public KuduTest,
  public:
   void SetUp() override {
     KuduTest::SetUp();
-    cache_.reset(NewCache(kCacheCapacity, "test-cache"));
+    const BenchSetup& setup = GetParam();
+    if (setup.eviction_policy == Cache::EvictionPolicy::SLRU) {
+      cache_.reset(NewSLRUCache(kProbationarySegmentCapacity, 
kProtectedSegmentCapacity,
+                                "test-cache", kLookups));
+    } else {
+      cache_.reset(NewCache(kCacheCapacity, "test-cache"));
+    }
   }
 
   // Run queries against the cache until '*done' becomes true.
@@ -157,10 +180,14 @@ class CacheBench : public KuduTest,
 // Test both distributions, and for each, test both the case where the data
 // fits in the cache and where it is a bit larger.
 INSTANTIATE_TEST_SUITE_P(Patterns, CacheBench, 
testing::ValuesIn(std::vector<BenchSetup>{
-      {BenchSetup::Pattern::ZIPFIAN, 1.0},
-      {BenchSetup::Pattern::ZIPFIAN, 3.0},
-      {BenchSetup::Pattern::UNIFORM, 1.0},
-      {BenchSetup::Pattern::UNIFORM, 3.0}
+      {BenchSetup::Pattern::ZIPFIAN, 1.0, Cache::EvictionPolicy::LRU},
+      {BenchSetup::Pattern::ZIPFIAN, 1.0, Cache::EvictionPolicy::SLRU},
+      {BenchSetup::Pattern::ZIPFIAN, 3.0, Cache::EvictionPolicy::LRU},
+      {BenchSetup::Pattern::ZIPFIAN, 3.0, Cache::EvictionPolicy::SLRU},
+      {BenchSetup::Pattern::UNIFORM, 1.0, Cache::EvictionPolicy::LRU},
+      {BenchSetup::Pattern::UNIFORM, 1.0, Cache::EvictionPolicy::SLRU},
+      {BenchSetup::Pattern::UNIFORM, 3.0, Cache::EvictionPolicy::LRU},
+      {BenchSetup::Pattern::UNIFORM, 3.0, Cache::EvictionPolicy::SLRU}
     }));
 
 TEST_P(CacheBench, RunBench) {
diff --git a/src/kudu/util/cache.cc b/src/kudu/util/cache.cc
index c0156fe12..c4f09ff78 100644
--- a/src/kudu/util/cache.cc
+++ b/src/kudu/util/cache.cc
@@ -75,6 +75,8 @@ string ToString(Cache::EvictionPolicy p) {
       return "fifo";
     case Cache::EvictionPolicy::LRU:
       return "lru";
+    case Cache::EvictionPolicy::SLRU:
+      return "slru";
     default:
       LOG(FATAL) << "unexpected cache eviction policy: " << 
static_cast<int>(p);
       break;
diff --git a/src/kudu/util/cache.h b/src/kudu/util/cache.h
index 61d664d04..5392b54ca 100644
--- a/src/kudu/util/cache.h
+++ b/src/kudu/util/cache.h
@@ -53,6 +53,9 @@ class Cache {
 
     // The least-recently-used items are evicted.
     LRU,
+
+    // Segmented version of LRU.
+    SLRU,
   };
 
   // Callback interface which is called when an entry is evicted from the 
cache.
diff --git a/src/kudu/util/slru_cache-test.cc b/src/kudu/util/slru_cache-test.cc
new file mode 100644
index 000000000..773828c1b
--- /dev/null
+++ b/src/kudu/util/slru_cache-test.cc
@@ -0,0 +1,679 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/slru_cache.h"
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/cache.h"
+#include "kudu/util/coding.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(cache_force_single_shard);
+
+namespace kudu {
+
+// Conversions between numeric keys/values and the types expected by Cache.
+static std::string EncodeInt(int k) {
+  faststring result;
+  PutFixed32(&result, k);
+  return result.ToString();
+}
+static int DecodeInt(const Slice& k) {
+  CHECK_EQ(4, k.size());
+  return DecodeFixed32(k.data());
+}
+
+// Cache sharding policy affects the composition of the cache. Some test
+// scenarios assume cache is single-sharded to keep the logic simpler.
+enum class ShardingPolicy {
+  MultiShard,
+  SingleShard,
+};
+
+constexpr size_t kSmallerSegmentSize = 4 * 1024 * 1024;
+constexpr size_t kLargerSegmentSize = 12 * 1024 * 1024;
+constexpr uint32_t kLookups = 5;
+
+class SLRUCacheBaseTest : public KuduTest,
+                          public Cache::EvictionCallback {
+ public:
+  SLRUCacheBaseTest(size_t probationary_segment_size,
+                    size_t protected_segment_size,
+                    uint32_t lookups)
+      : probationary_segment_size_(probationary_segment_size),
+        protected_segment_size_(protected_segment_size),
+        total_cache_size_(probationary_segment_size + protected_segment_size),
+        lookups_threshold_(lookups)
+        { }
+
+  // Implementation of the EvictionCallback interface.
+  void EvictedEntry(Slice key, Slice val) override {
+    evicted_keys_.push_back(DecodeInt(key));
+    evicted_values_.push_back(DecodeInt(val));
+  }
+
+  // Returns -1 if no key is found in cache.
+  // When inserting entries, don't use -1 as the value.
+  int Lookup(int key) {
+    auto handle(slru_cache_->Lookup(EncodeInt(key), Cache::EXPECT_IN_CACHE));
+    return handle ? DecodeInt(slru_cache_->Value(handle)) : -1;
+  }
+
+  void Insert(int key, int value, int charge = 1) {
+    std::string key_str = EncodeInt(key);
+    std::string val_str = EncodeInt(value);
+    auto handle(slru_cache_->Allocate(key_str, val_str.size(), charge));
+    CHECK(handle);
+    memcpy(slru_cache_->MutableValue(&handle), val_str.data(), val_str.size());
+    slru_cache_->Insert(std::move(handle), this);
+  }
+
+  void Erase(int key) {
+    slru_cache_->Erase(EncodeInt(key));
+  }
+
+  // Returns true if probationary segment contains key.
+  // Returns false if not.
+  bool ProbationaryContains(const Slice& key) {
+    const uint32_t hash = slru_cache_->HashSlice(key);
+    return 
slru_cache_->shards_[slru_cache_->Shard(hash)]->ProbationaryContains(key, hash);
+  }
+
+  // Returns true if protected segment contains key.
+  // Returns false if not.
+  bool ProtectedContains(const Slice& key) {
+    const uint32_t hash = slru_cache_->HashSlice(key);
+    return 
slru_cache_->shards_[slru_cache_->Shard(hash)]->ProtectedContains(key, hash);
+  }
+
+  protected:
+   void SetupWithParameters(Cache::MemoryType mem_type,
+                            ShardingPolicy sharding_policy) {
+     // Using single shard makes the logic of scenarios simple
+     // for capacity and eviction-related behavior.
+     FLAGS_cache_force_single_shard = (sharding_policy == 
ShardingPolicy::SingleShard);
+
+     if (mem_type == Cache::MemoryType::DRAM) {
+       
slru_cache_.reset(NewSLRUCache<Cache::MemoryType::DRAM>(probationary_segment_size_,
+                                                               
protected_segment_size_,
+                                                               
"slru_cache_test",
+                                                               
lookups_threshold_));
+     } else {
+       FAIL() << mem_type << ": unrecognized slru cache memory type";
+     }
+     MemTracker::FindTracker("slru_cache_test-sharded_slru_cache", 
&mem_tracker_);
+
+     ASSERT_TRUE(mem_tracker_.get());
+  }
+
+   const size_t probationary_segment_size_;
+   const size_t protected_segment_size_;
+   const size_t total_cache_size_;
+   const uint32_t lookups_threshold_;
+   std::vector<int> evicted_keys_;
+   std::vector<int> evicted_values_;
+   std::shared_ptr<MemTracker> mem_tracker_;
+   std::unique_ptr<ShardedSLRUCache> slru_cache_;
+};
+
+class SLRUCacheTest :
+    public SLRUCacheBaseTest,
+    public ::testing::WithParamInterface<std::pair<Cache::MemoryType, 
ShardingPolicy>> {
+
+ public:
+  SLRUCacheTest()
+      : SLRUCacheBaseTest(kSmallerSegmentSize, kLargerSegmentSize, kLookups) {
+}
+
+  void SetUp() override {
+    const auto& param = GetParam();
+    SetupWithParameters(param.first, param.second);
+  }
+};
+
+INSTANTIATE_TEST_SUITE_P(
+    CacheTypes, SLRUCacheTest,
+    ::testing::Values(
+        std::make_pair(Cache::MemoryType::DRAM,
+                       ShardingPolicy::MultiShard),
+        std::make_pair(Cache::MemoryType::DRAM,
+                       ShardingPolicy::SingleShard)));
+
+class SLRUSingleShardCacheTest :
+    public SLRUCacheBaseTest,
+    public ::testing::WithParamInterface<Cache::MemoryType> {
+
+ public:
+  SLRUSingleShardCacheTest()
+      : SLRUCacheBaseTest(kSmallerSegmentSize, kLargerSegmentSize, kLookups) {
+  }
+
+  void SetUp() override {
+    const auto& param = GetParam();
+    SetupWithParameters(param, ShardingPolicy::SingleShard);
+  }
+};
+
+INSTANTIATE_TEST_SUITE_P(
+    CacheTypes, SLRUSingleShardCacheTest,
+    ::testing::Values(Cache::MemoryType::DRAM));
+
+class SLRUSingleShardTest :
+    public SLRUCacheBaseTest,
+    public ::testing::WithParamInterface<Cache::MemoryType> {
+
+ public:
+  SLRUSingleShardTest()
+      : SLRUCacheBaseTest(kLargerSegmentSize, kSmallerSegmentSize, kLookups) {
+  }
+
+  void SetUp() override {
+    const auto& param = GetParam();
+    SetupWithParameters(param, ShardingPolicy::SingleShard);
+  }
+};
+
+INSTANTIATE_TEST_SUITE_P(
+    CacheTypes, SLRUSingleShardTest,
+    ::testing::Values(Cache::MemoryType::DRAM));
+
+// Tests that an entry is upgraded from probationary to protected segment
+// after being looked up more than lookups_threshold_ times.
+TEST_P(SLRUCacheTest, Upgrade) {
+  ASSERT_EQ(-1, Lookup(100));
+
+  Insert(100, 101);
+  ASSERT_EQ(101, Lookup(100));
+  ASSERT_EQ(-1,  Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+  ASSERT_TRUE(ProbationaryContains(EncodeInt(100)));
+
+  Insert(200, 201);
+  ASSERT_EQ(101, Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+  ASSERT_TRUE(ProbationaryContains(EncodeInt(200)));
+
+  Insert(100, 102);
+  ASSERT_EQ(102, Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+  ASSERT_TRUE(ProbationaryContains(EncodeInt(100)));
+
+  // Upgrade entry to protected segment.
+  for (auto i = 0; i < lookups_threshold_ - 1; ++i) {
+    ASSERT_EQ(102, Lookup(100));
+  }
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(100)));
+  ASSERT_TRUE(ProtectedContains(EncodeInt(100)));
+
+  // Lookup entry multiple times in protected segment.
+  for (auto i = 0; i < 3 * lookups_threshold_; ++i) {
+    ASSERT_EQ(102, Lookup(100));
+  }
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(100)));
+  ASSERT_TRUE(ProtectedContains(EncodeInt(100)));
+
+  // Check that entry in protected segment works with upsert case.
+  Insert(100, 103);
+  ASSERT_EQ(103, Lookup(100));
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(100)));
+  ASSERT_TRUE(ProtectedContains(EncodeInt(100)));
+}
+
+// Tests that entries are properly erased from both the probationary and 
protected segments.
+TEST_P(SLRUCacheTest, Erase) {
+  Erase(200);
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  Insert(100, 101);
+  Insert(200, 201);
+
+  // Erase first entry, verify it's removed from cache by checking eviction 
callback.
+  Erase(100);
+  ASSERT_EQ(-1,  Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+
+  Erase(100);
+  ASSERT_EQ(-1,  Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(1, evicted_keys_.size());
+
+  // Upgrade second entry to protected segment.
+  for (auto i = 0; i < lookups_threshold_ - 1; ++i) {
+    ASSERT_EQ(201, Lookup(200));
+  }
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(200)));
+  ASSERT_TRUE(ProtectedContains(EncodeInt(200)));
+
+  // Erase second entry from protected segment.
+  Erase(200);
+  ASSERT_EQ(2, evicted_keys_.size());
+  ASSERT_EQ(200, evicted_keys_[1]);
+  ASSERT_EQ(201, evicted_values_[1]);
+}
+
+// Underlying entry isn't actually deleted until handle around it from lookup 
is reset.
+TEST_P(SLRUCacheTest, EntriesArePinned) {
+  Insert(100, 101);
+  auto h1 = slru_cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+  ASSERT_EQ(101, DecodeInt(slru_cache_->Value(h1)));
+
+  Insert(100, 102);
+  auto h2 = slru_cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+  ASSERT_EQ(102, DecodeInt(slru_cache_->Value(h2)));
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  h1.reset();
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+
+  Erase(100);
+  ASSERT_EQ(-1, Lookup(100));
+  ASSERT_EQ(1, evicted_keys_.size());
+
+  h2.reset();
+  ASSERT_EQ(2, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[1]);
+  ASSERT_EQ(102, evicted_values_[1]);
+}
+
+// Tests that a frequently accessed entry is not evicted.
+TEST_P(SLRUCacheTest, LRUEviction) {
+  static constexpr int kNumElems = 1000;
+  const int size_per_elem = static_cast<int>(total_cache_size_ / kNumElems);
+
+  Insert(100, 101);
+  Insert(200, 201);
+
+  // Loop adding and looking up new entries, but repeatedly accessing key 101.
+  // This frequently-used entry should not be evicted.
+  for (int i = 0; i < kNumElems + 1000; i++) {
+    Insert(1000+i, 2000+i, size_per_elem);
+    ASSERT_EQ(2000+i, Lookup(1000+i));
+    ASSERT_EQ(101, Lookup(100));
+  }
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(100)));
+  ASSERT_TRUE(ProtectedContains(EncodeInt(100)));
+  ASSERT_EQ(101, Lookup(100));
+  // Since '200' wasn't accessed in the loop above, it should have been 
evicted.
+  ASSERT_EQ(-1, Lookup(200));
+}
+
+// Tests that entries are evicted from protected segment when it's at capacity.
+// Also tests the upsert case for a full protected segment.
+TEST_P(SLRUSingleShardCacheTest, Downgrade) {
+  const int weight = static_cast<int>(protected_segment_size_ / 100);
+  int added_weight = 0;
+  int i = 0;
+  const int delta = 100;
+  std::vector<int> keys;
+  std::vector<int> values;
+  // Add enough entries to fill protected segment to capacity.
+  while (added_weight < 0.99 * protected_segment_size_) {
+    Insert(i, delta + i, weight);
+    keys.emplace_back(i);
+    values.emplace_back(delta + i);
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(i)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(i)));
+    // Upgrade to protected segment.
+    for (int j = 0; j < lookups_threshold_; ++j) {
+      ASSERT_EQ(delta + i, Lookup(i));
+    }
+    // Verify entry was upgraded.
+    ASSERT_FALSE(ProbationaryContains(EncodeInt(i)));
+    ASSERT_TRUE(ProtectedContains(EncodeInt(i)));
+    added_weight += weight;
+    ++i;
+  }
+  // Ensure that the same underlying entry is used when upgraded and eviction 
callback is not used.
+  ASSERT_TRUE(evicted_keys_.empty());
+  ASSERT_TRUE(evicted_values_.empty());
+
+  int added_weight1 = 0;
+  int k = 200;
+  int r = 0;              // Mirrors i from previous loop, so we can track 
evicted entries.
+  int evicted_index = 0;
+  // Add entries to probationary segment, upgrade them to protected segment 
thus
+  // kicking out entries from the protected segment since it's already at 
capacity.
+  while (added_weight1 < 0.5 * probationary_segment_size_) {
+    Insert(k, delta + k, weight);
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(k)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(k)));
+    // Upgrade to protected segment, will kick out the LRU entry to make space.
+    for (int j = 0; j < lookups_threshold_; ++j) {
+      ASSERT_EQ(delta + k, Lookup(k));
+    }
+    // Verify current entry is upgraded.
+    ASSERT_FALSE(ProbationaryContains(EncodeInt(k)));
+    ASSERT_TRUE(ProtectedContains(EncodeInt(k)));
+    // Verify LRU entry from protected segment is evicted to probationary 
segment.
+    ASSERT_EQ(r, keys[evicted_index]);
+    ASSERT_EQ(delta + r, values[evicted_index]);
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(r)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(r)));
+    added_weight1 += weight;
+    ++k, ++r, ++evicted_index;
+  }
+
+  // Upsert middle entry in full protected segment with larger weight to kick 
out
+  // the LRU entry to probationary segment.
+  int entry = 50;
+  // Assert LRU entry of protected segment.
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(r)));
+  ASSERT_TRUE(ProtectedContains(EncodeInt(r)));
+  // Upsert middle entry of full protected segment to kick out LRU entry.
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(entry)));
+  ASSERT_TRUE(ProtectedContains(EncodeInt(entry)));
+  ASSERT_EQ(delta + entry, Lookup(entry));
+  Insert(entry, delta * 2 + entry, static_cast<int>(1.25 * weight));
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(entry)));
+  ASSERT_TRUE(ProtectedContains(EncodeInt(entry)));
+  ASSERT_EQ(delta * 2 + entry, Lookup(entry));
+  // Ensure LRU entry was evicted to probationary shard.
+  ASSERT_TRUE(ProbationaryContains(EncodeInt(r)));
+  ASSERT_FALSE(ProtectedContains(EncodeInt(r)));
+  ASSERT_EQ(delta + r, Lookup(r));
+}
+
+// Tests that the LRU entries from the probationary segment are evicted to 
make space for
+// any entries being downgraded from the protected segment. Ensure that only 
enough LRU entries to
+// make space for new entries will be evicted.
+TEST_P(SLRUSingleShardCacheTest, DowngradeEviction) {
+  const int weight = static_cast<int>(protected_segment_size_ / 100);
+  int added_weight = 0;
+  int i = 0;
+  const int delta = 100;
+  // Add enough entries to fill protected segment to capacity.
+  while (added_weight < 0.99 * protected_segment_size_) {
+    Insert(i, delta + i, weight);
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(i)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(i)));
+    // Upgrade to protected segment.
+    for (int j = 0; j < lookups_threshold_; ++j) {
+      ASSERT_EQ(delta + i, Lookup(i));
+    }
+    // Verify entry was upgraded.
+    ASSERT_FALSE(ProbationaryContains(EncodeInt(i)));
+    ASSERT_TRUE(ProtectedContains(EncodeInt(i)));
+    added_weight += weight;
+    ++i;
+  }
+  // Ensure that the same underlying entry is used when upgraded and eviction 
callback is not used.
+  ASSERT_TRUE(evicted_keys_.empty());
+  ASSERT_TRUE(evicted_values_.empty());
+
+  const int probationary_weight = static_cast<int>(probationary_segment_size_ 
/ 100);
+  int probationary_added_weight = 0;
+  int j = 200;
+  std::vector<int> probationary_keys;
+  std::vector<int> probationary_values;
+  // Add enough entries to fill probationary segment to capacity.
+  while (probationary_added_weight < 0.99 * probationary_segment_size_) {
+    Insert(j, delta + j, probationary_weight);
+    probationary_keys.emplace_back(j);
+    probationary_values.emplace_back(delta + j);
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(j)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(j)));
+    probationary_added_weight += probationary_weight;
+    ++j;
+  }
+
+
+  // Upgrade the most recent probationary entry.
+  auto last_key = probationary_keys.back();
+  auto last_value = probationary_values.back();
+  for (int k = 0; k < lookups_threshold_; ++k) {
+    ASSERT_EQ(last_value, Lookup(last_key));
+  }
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(last_key)));
+  ASSERT_TRUE(ProtectedContains(EncodeInt(last_key)));
+
+
+  // Verify that the LRU entries from the probationary segment are evicted to 
make space for entry
+  // from protected segment being downgraded.
+  for (int i = 0; i < evicted_keys_.size(); ++i) {
+    ASSERT_EQ(probationary_keys[i], evicted_keys_[i]);
+    ASSERT_EQ(probationary_values[i], evicted_values_[i]);
+  }
+}
+
+// Test that entries in protected segment stay there after inserts larger than 
total cache size.
+TEST_P(SLRUSingleShardCacheTest, LongInserts) {
+  const int weight = static_cast<int>(protected_segment_size_ / 100);
+  int added_weight = 0;
+  int i = 0;
+  // Add enough entries to fill protected segment to capacity.
+  while (added_weight < 0.99 * protected_segment_size_) {
+    Insert(i, 100 + i, weight);
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(i)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(i)));
+    // Upgrade to protected segment.
+    for (int j = 0; j < lookups_threshold_; ++j) {
+      ASSERT_EQ(100 + i, Lookup(i));
+    }
+    // Verify entry was upgraded.
+    ASSERT_FALSE(ProbationaryContains(EncodeInt(i)));
+    ASSERT_TRUE(ProtectedContains(EncodeInt(i)));
+    added_weight += weight;
+    ++i;
+  }
+
+  // Insert random entries, total weight will be larger than the entire 
cache's size.
+  int added_weight1 = 0;
+  int k = 200;
+  while (added_weight1 < 3 * total_cache_size_) {
+    Insert(k, 100 + k, weight);
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(k)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(k)));
+    added_weight1 += weight;
+    ++k;
+  }
+
+  // Verify entries from protected segment were not affected from the inserts 
above.
+  for (int l = 0; l < 100; ++l) {
+    ASSERT_EQ(100 + l, Lookup(l));
+    ASSERT_FALSE(ProbationaryContains(EncodeInt(l)));
+    ASSERT_TRUE(ProtectedContains(EncodeInt(l)));
+  }
+}
+
+// Tests simple corner cases ensuring entries with size that's greater than 
the probationary
+// segment will not be inserted into the cache.
+TEST_P(SLRUSingleShardCacheTest, CornerCases) {
+  ASSERT_EQ(-1, Lookup(100));
+
+  Insert(100, 101, kSmallerSegmentSize + kLargerSegmentSize);
+  ASSERT_EQ(-1, Lookup(100));
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(100)));
+
+  Insert(100, 101, static_cast<int>(1.01 * kSmallerSegmentSize));
+  ASSERT_EQ(-1, Lookup(100));
+  ASSERT_FALSE(ProbationaryContains(EncodeInt(100)));
+
+  Insert(100, 101, kSmallerSegmentSize);
+  ASSERT_EQ(101, Lookup(100));
+  ASSERT_TRUE(ProbationaryContains(EncodeInt(100)));
+}
+
+// Fills the probationary segment to capacity then inserts an entry with size 
greater
+// than the probationary segment thus evicting all prior entries from the 
probationary segment.
+TEST_P(SLRUSingleShardTest, CapacityCases) {
+  const int weight = static_cast<int>(probationary_segment_size_ / 100);
+  int added_weight = 0;
+  int i = 0;
+  const int delta = 100;
+  std::vector<int> keys;
+  std::vector<int> values;
+
+  // Add enough entries to fill probationary segment to capacity.
+  while (added_weight < 0.99 * probationary_segment_size_) {
+    Insert(i, delta + i, weight);
+    keys.emplace_back(i);
+    values.emplace_back(delta + i);
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(i)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(i)));
+    added_weight += weight;
+    ++i;
+  }
+
+  ASSERT_EQ(100, keys.size());
+  ASSERT_EQ(100, values.size());
+  ASSERT_EQ(0, evicted_keys_.size());
+  ASSERT_EQ(0, evicted_values_.size());
+
+  for (int j = 0; j < keys.size(); ++j) {
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(keys[j])));
+    ASSERT_EQ(values[j], Lookup(keys[j]));
+  }
+
+  // Insert entry whose size is equal to capacity of the probationary segment, 
this entry's
+  // insertion will force the eviction of all existing entries in the 
probationary segment.
+  Insert(200, 201, probationary_segment_size_);
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_TRUE(ProbationaryContains(EncodeInt(200)));
+
+  ASSERT_EQ(100, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_values_.size());
+
+  // Verify that the entries in the probationary segment prior to the insertion
+  // of the entry above are evicted from the cache.
+  for (int j = 0; j < evicted_keys_.size(); j++) {
+    ASSERT_FALSE(ProbationaryContains(EncodeInt(keys[j])));
+    ASSERT_EQ(-1, Lookup(keys[j]));
+    ASSERT_EQ(keys[j], evicted_keys_[j]);
+    ASSERT_EQ(values[j], evicted_values_[j]);
+  }
+}
+
+// Tests the case where an entry with size greater than the protected segment 
is not
+// upgraded from the probationary segment and any entries in the protected 
segment are unaffected.
+TEST_P(SLRUSingleShardTest, EdgeCase) {
+  const int weight = static_cast<int>(protected_segment_size_ / 100);
+  int added_weight = 0;
+  int i = 0;
+  const int delta = 100;
+  std::vector<int> keys;
+  std::vector<int> values;
+
+  // Add enough entries to fill protected segment to capacity.
+  while (added_weight < 0.99 * protected_segment_size_) {
+    Insert(i, delta + i, weight);
+    keys.emplace_back(i);
+    values.emplace_back(delta + i);
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(i)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(i)));
+    // Upgrade to protected segment.
+    for (int j = 0; j < lookups_threshold_; ++j) {
+      ASSERT_EQ(delta + i, Lookup(i));
+    }
+    // Verify entry was upgraded.
+    ASSERT_FALSE(ProbationaryContains(EncodeInt(i)));
+    ASSERT_TRUE(ProtectedContains(EncodeInt(i)));
+    added_weight += weight;
+    ++i;
+  }
+
+  // Insert entry whose size is equal to capacity of the probationary segment.
+  Insert(200, 201, probationary_segment_size_);
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_TRUE(ProbationaryContains(EncodeInt(200)));
+
+  // Attempt to upgrade larger entry to the protected segment.
+  for (int k = 0; k < lookups_threshold_; k++) {
+    ASSERT_EQ(201, Lookup(200));
+  }
+  // Verify entry is not upgraded since its size is larger
+  // than the capacity of the protected segment.
+  ASSERT_TRUE(ProbationaryContains(EncodeInt(200)));
+  ASSERT_FALSE(ProtectedContains(EncodeInt(200)));
+
+  // Verify entries in protected segment were not affected
+  // by attempted upgrade of large entry.
+  for (int j = 0; j < keys.size(); j++) {
+    ASSERT_FALSE(ProbationaryContains(EncodeInt(keys[j])));
+    ASSERT_TRUE(ProtectedContains(EncodeInt(keys[j])));
+    ASSERT_EQ(values[j], Lookup(keys[j]));
+  }
+}
+
+// Insert entries with same key but different weights for
+// both the probationary and protected segments.
+TEST_P(SLRUSingleShardTest, InsertAndLookupPatterns) {
+  std::vector<int> weights = {10, 3, 2, 1};
+  int key = 0;
+  const int value = 100;
+  const int delta = 10;
+
+  // Insert entry with same key in probationary segment with different weights.
+  for (auto j = 0; j < weights.size(); ++j) {
+    const int weight = static_cast<int>(probationary_segment_size_ / 
weights[j]);
+    Insert(key, value, weight);
+    ASSERT_EQ(value, Lookup(key));
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(key)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(key)));
+
+    // Upsert case in the probationary segment.
+    Insert(key, value + delta, weight);
+    ASSERT_EQ(value + delta, Lookup(key));
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(key)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(key)));
+    Erase(key);
+  }
+
+  // Insert entry with same key in protected segment with different weights.
+  for (auto j = 0; j < weights.size(); ++j) {
+    const int weight = static_cast<int>(protected_segment_size_ / weights[j]);
+    Insert(key, value, weight);
+    ASSERT_EQ(value, Lookup(key));
+    ASSERT_TRUE(ProbationaryContains(EncodeInt(key)));
+    ASSERT_FALSE(ProtectedContains(EncodeInt(key)));
+
+    // Upgrade entry to the protected segment.
+    for (auto j = 0; j < lookups_threshold_; ++j) {
+      ASSERT_EQ(value, Lookup(key));
+    }
+    ASSERT_FALSE(ProbationaryContains(EncodeInt(key)));
+    ASSERT_TRUE(ProtectedContains(EncodeInt(key)));
+
+    // Upsert case in the protected segment.
+    Insert(key, value + delta, weight);
+    ASSERT_EQ(value + delta, Lookup(key));
+    ASSERT_FALSE(ProbationaryContains(EncodeInt(key)));
+    ASSERT_TRUE(ProtectedContains(EncodeInt(key)));
+    Erase(key);
+  }
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/slru_cache.cc b/src/kudu/util/slru_cache.cc
new file mode 100644
index 000000000..f8aafb6ae
--- /dev/null
+++ b/src/kudu/util/slru_cache.cc
@@ -0,0 +1,488 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/slru_cache.h"
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/alignment.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/slice.h"
+
+DECLARE_bool(cache_force_single_shard);
+DECLARE_double(cache_memtracker_approximation_ratio);
+
+using std::vector;
+using Handle = kudu::Cache::Handle;
+using EvictionCallback = kudu::Cache::EvictionCallback;
+
+namespace kudu {
+
+SLRUCacheShard::SLRUCacheShard(MemTracker* tracker, size_t capacity)
+    : capacity_(capacity),
+      usage_(0),
+      mem_tracker_(tracker) {
+  max_deferred_consumption_ = capacity * 
FLAGS_cache_memtracker_approximation_ratio;
+  // Make empty circular linked list.
+  rl_.next = &rl_;
+  rl_.prev = &rl_;
+}
+
+SLRUCacheShard::~SLRUCacheShard() {
+  for (SLRUHandle* e = rl_.next; e != &rl_; ) {
+    SLRUHandle* next = e->next;
+    DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1)
+      << "caller has an unreleased handle";
+    if (Unref(e)) {
+      FreeEntry(e);
+    }
+    e = next;
+  }
+  mem_tracker_->Consume(deferred_consumption_);
+}
+
+bool SLRUCacheShard::Unref(SLRUHandle* e) {
+  DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0);
+  return e->refs.fetch_sub(1) == 1;
+}
+
+void SLRUCacheShard::FreeEntry(SLRUHandle* e) {
+  DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0);
+  if (e->eviction_callback) {
+    e->eviction_callback->EvictedEntry(e->key(), e->value());
+  }
+  UpdateMemTracker(-static_cast<int64_t>(e->charge));
+  delete [] e;
+}
+
+void SLRUCacheShard::SoftFreeEntry(SLRUHandle* e) {
+  UpdateMemTracker(-static_cast<int64_t>(e->charge));
+}
+
+void SLRUCacheShard::UpdateMemTracker(int64_t delta) {
+  int64_t old_deferred = deferred_consumption_.fetch_add(delta);
+  int64_t new_deferred = old_deferred + delta;
+
+  if (new_deferred > max_deferred_consumption_ ||
+      new_deferred < -max_deferred_consumption_) {
+    int64_t to_propagate = deferred_consumption_.exchange(0, 
std::memory_order_relaxed);
+    mem_tracker_->Consume(to_propagate);
+  }
+}
+
+void SLRUCacheShard::RL_Remove(SLRUHandle* e) {
+  e->next->prev = e->prev;
+  e->prev->next = e->next;
+  DCHECK_GE(usage_, e->charge);
+  usage_ -= e->charge;
+}
+
+void SLRUCacheShard::RL_Append(SLRUHandle* e) {
+  // Make "e" newest entry by inserting just before rl_.
+  e->next = &rl_;
+  e->prev = rl_.prev;
+  e->prev->next = e;
+  e->next->prev = e;
+  usage_ += e->charge;
+}
+
+void SLRUCacheShard::RL_UpdateAfterLookup(SLRUHandle* e) {
+  RL_Remove(e);
+  RL_Append(e);
+}
+
+// No mutex is needed here since all the SLRUCacheShardPair methods that 
access the underlying
+// shards and its tables are protected by mutexes. Same logic applies to the 
all the below
+// methods in SLRUCacheShard.
+Handle* SLRUCacheShard::Lookup(const Slice& key, uint32_t hash, bool caching) {
+  SLRUHandle* e = table_.Lookup(key, hash);
+  if (e != nullptr) {
+    e->refs.fetch_add(1, std::memory_order_relaxed);
+    e->lookups++;
+    RL_UpdateAfterLookup(e);
+  }
+
+  return reinterpret_cast<Handle*>(e);
+}
+
+bool SLRUCacheShard::Contains(const Slice& key, uint32_t hash) {
+  return table_.Lookup(key, hash) != nullptr;
+}
+
+void SLRUCacheShard::Release(Handle* handle) {
+  SLRUHandle* e = reinterpret_cast<SLRUHandle*>(handle);
+  // If this is the last reference of the handle, the entry will be freed.
+  if (Unref(e)) {
+    FreeEntry(e);
+  }
+}
+
+Handle* SLRUCacheShard::Insert(SLRUHandle* handle,
+                               EvictionCallback* eviction_callback) {
+  // Set the remaining SLRUHandle members which were not already allocated 
during Allocate().
+  handle->eviction_callback = eviction_callback;
+  // Two refs for the handle: one from SLRUCacheShard, one for the returned 
handle.
+  handle->refs.store(2, std::memory_order_relaxed);
+  UpdateMemTracker(handle->charge);
+
+  RL_Append(handle);
+
+  SLRUHandle* old_entry = table_.Insert(handle);
+  if (old_entry != nullptr) {
+    RL_Remove(old_entry);
+    if (Unref(old_entry)) {
+      FreeEntry(old_entry);
+    }
+  }
+
+  while (usage_ > capacity_ && rl_.next != &rl_) {
+    SLRUHandle* old = rl_.next;
+    RL_Remove(old);
+    table_.Remove(old->key(), old->hash);
+    if (Unref(old)) {
+      FreeEntry(old);
+    }
+  }
+
+  return reinterpret_cast<Handle*>(handle);
+}
+
+vector<SLRUHandle*> SLRUCacheShard::InsertAndReturnEvicted(SLRUHandle* handle) 
{
+  handle->refs.fetch_add(1, std::memory_order_relaxed);
+  handle->in_protected_segment.store(true, std::memory_order_relaxed);
+  UpdateMemTracker(handle->charge);
+
+  vector<SLRUHandle*> evicted_entries;
+  handle->Sanitize();
+  RL_Append(handle);
+
+  // No entries should exist with same key in protected segment when upgrading.
+  SLRUHandle* old_entry = table_.Insert(handle);
+  DCHECK(old_entry == nullptr);
+
+  while (usage_ > capacity_ && rl_.next != &rl_) {
+    SLRUHandle* old = rl_.next;
+    RL_Remove(old);
+    table_.Remove(old->key(), old->hash);
+    Unref(old);
+    SoftFreeEntry(old);
+    evicted_entries.emplace_back(old);
+  }
+
+  return evicted_entries;
+}
+
+Handle* SLRUCacheShard::ProtectedInsert(SLRUHandle* handle,
+                                        EvictionCallback* eviction_callback,
+                                        vector<SLRUHandle*>* evictions) {
+  handle->eviction_callback = eviction_callback;
+  handle->refs.store(2, std::memory_order_relaxed);
+  UpdateMemTracker(handle->charge);
+
+  RL_Append(handle);
+
+  // Upsert case so Insert should return a non-null entry.
+  SLRUHandle* old_entry = table_.Insert(handle);
+  DCHECK(old_entry != nullptr);
+  RL_Remove(old_entry);
+  if (Unref(old_entry)) {
+    FreeEntry(old_entry);
+  }
+
+  while (usage_ > capacity_ && rl_.next != &rl_) {
+    SLRUHandle* old = rl_.next;
+    RL_Remove(old);
+    table_.Remove(old->key(), old->hash);
+    Unref(old);
+    SoftFreeEntry(old);
+    evictions->emplace_back(old);
+  }
+
+  return reinterpret_cast<Handle*>(handle);
+}
+
+void SLRUCacheShard::ReInsert(SLRUHandle* handle) {
+  handle->refs.fetch_add(1, std::memory_order_relaxed);
+  handle->in_protected_segment.store(false, std::memory_order_relaxed);
+  UpdateMemTracker(handle->charge);
+
+  handle->Sanitize();
+  RL_Append(handle);
+
+  // No entries should exist with same key in probationary segment when 
downgrading.
+  SLRUHandle* old_entry = table_.Insert(handle);
+  DCHECK(old_entry == nullptr);
+
+  while (usage_ > capacity_ && rl_.next != &rl_) {
+    SLRUHandle* old = rl_.next;
+    RL_Remove(old);
+    table_.Remove(old->key(), old->hash);
+    if (Unref(old)) {
+      FreeEntry(old);
+    }
+  }
+}
+
+void SLRUCacheShard::Erase(const Slice& key, uint32_t hash) {
+  SLRUHandle* e = table_.Remove(key, hash);
+  if (e != nullptr) {
+    RL_Remove(e);
+    // Free entry if this is the last reference.
+    if (Unref(e)) {
+      FreeEntry(e);
+    }
+  }
+}
+
+void SLRUCacheShard::SoftErase(const Slice& key, uint32_t hash) {
+  SLRUHandle* e = table_.Remove(key, hash);
+  if (e != nullptr) {
+    RL_Remove(e);
+    Unref(e);
+    SoftFreeEntry(e);
+  }
+}
+
+SLRUCacheShardPair::SLRUCacheShardPair(MemTracker* mem_tracker,
+                                       size_t probationary_capacity,
+                                       size_t protected_capacity,
+                                       uint32_t lookups) :
+    probationary_shard_(SLRUCacheShard(mem_tracker, probationary_capacity)),
+    protected_shard_(SLRUCacheShard(mem_tracker, protected_capacity)),
+    lookups_threshold_(lookups) {
+}
+
+// Commit a prepared entry into the probationary segment if entry does not 
exist or if it
+// exists in the probationary segment (upsert case).
+// If entry exists in protected segment, entry will be upserted and any 
evicted entries will
+// be properly downgraded to the probationary segment.
+// Look at Cache::Insert() for more details.
+Handle* SLRUCacheShardPair::Insert(SLRUHandle* handle,
+                                   EvictionCallback* eviction_callback) {
+  std::lock_guard<decltype(mutex_)> l(mutex_);
+  if (!ProtectedContains(handle->key(), handle->hash)) {
+    return probationary_shard_.Insert(handle, eviction_callback);
+  }
+  // If newly inserted entry has greater charge than previous one,
+  // possible that entries can be evicted if at capacity.
+  vector<SLRUHandle*> evictions;
+  auto* inserted = protected_shard_.ProtectedInsert(handle, eviction_callback, 
&evictions);
+  for (auto it = evictions.begin(); it != evictions.end(); ++it) {
+    SLRUHandle* evicted_entry = *it;
+    probationary_shard_.ReInsert(evicted_entry);
+  }
+  return inserted;
+}
+
+Handle* SLRUCacheShardPair::Lookup(const Slice& key, uint32_t hash, bool 
caching) {
+  // Lookup protected segment:
+  //  - Hit: Return handle.
+  //  - Miss: Lookup probationary segment
+  //      - Hit: If the number of lookups is < than 'lookups_threshold_', 
return the lookup handle.
+  //             If the number of lookups is >= than 'lookups_threshold_', 
upgrade the entry:
+  //                Erase entry from the probationary segment and insert entry 
into the protected
+  //                segment. Return the lookup handle. If any entries are 
evicted from the
+  //                protected segment, insert them into the probationary 
segment.
+  //      - Miss: Return the handle.
+  //
+  std::lock_guard<decltype(mutex_)> l(mutex_);
+  Handle* protected_handle = protected_shard_.Lookup(key, hash, caching);
+
+  // If the key exists in the protected segment, return the result from the 
lookup of the
+  // protected segment.
+  if (protected_handle) {
+    return protected_handle;
+  }
+  Handle* probationary_handle = probationary_shard_.Lookup(key, hash, caching);
+
+  // Return null handle if handle is not found in either the probationary or 
protected segment.
+  if (!probationary_handle) {
+    return probationary_handle;
+  }
+  auto* val_handle = reinterpret_cast<SLRUHandle*>(probationary_handle);
+  // If the number of lookups for entry isn't at the minimum number required 
before
+  // upgrading to the protected segment, return the entry found in 
probationary segment.
+  // If the entry's charge is larger than the protected segment's capacity, 
return entry found
+  // in probationary segment to avoid evicting any entries in the protected 
segment.
+  if (val_handle->lookups < lookups_threshold_ ||
+      val_handle->charge > protected_shard_.capacity()) {
+    return probationary_handle;
+  }
+
+  // Upgrade from the probationary segment.
+  // Erase entry from the probationary segment then add entry to the protected 
segment.
+  probationary_shard_.SoftErase(key, hash);
+  vector<SLRUHandle*> evictions = 
protected_shard_.InsertAndReturnEvicted(val_handle);
+
+  // Go through all evicted entries from the protected segment and insert them 
into
+  // the probationary segment. Insert the LRU entries from the protected 
segment first.
+  for (auto it = evictions.begin(); it != evictions.end(); ++it) {
+    SLRUHandle* evicted_entry = *it;
+    probationary_shard_.ReInsert(evicted_entry);
+  }
+  return probationary_handle;
+}
+
+void SLRUCacheShardPair::Release(Handle* handle) {
+  SLRUHandle* e = reinterpret_cast<SLRUHandle*>(handle);
+
+  // Release from either the probationary or the protected shard.
+  if (!e->in_protected_segment.load(std::memory_order_relaxed)) {
+    probationary_shard_.Release(handle);
+  } else {
+    protected_shard_.Release(handle);
+  }
+}
+
+void SLRUCacheShardPair::Erase(const Slice& key, uint32_t hash) {
+  std::lock_guard<decltype(mutex_)> l(mutex_);
+  probationary_shard_.Erase(key, hash);
+  protected_shard_.Erase(key, hash);
+}
+
+bool SLRUCacheShardPair::ProbationaryContains(const Slice& key, uint32_t hash) 
{
+  return probationary_shard_.Contains(key, hash);
+}
+
+bool SLRUCacheShardPair::ProtectedContains(const Slice& key, uint32_t hash) {
+  return protected_shard_.Contains(key, hash);
+}
+
+ShardedSLRUCache::ShardedSLRUCache(size_t probationary_capacity, size_t 
protected_capacity,
+                                   const std::string& id, const uint32_t 
lookups)
+    : shard_bits_(DetermineShardBits()) {
+  // A cache is often a singleton, so:
+  // 1. We reuse its MemTracker if one already exists, and
+  // 2. It is directly parented to the root MemTracker.
+  mem_tracker_ = MemTracker::FindOrCreateGlobalTracker(
+      -1, strings::Substitute("$0-sharded_slru_cache", id));
+
+  CHECK_GT(lookups, 0);
+  int num_shards = 1 << shard_bits_;
+  const size_t per_probationary_shard = (probationary_capacity + (num_shards - 
1)) / num_shards;
+  const size_t per_protected_shard = (protected_capacity + (num_shards - 1)) / 
num_shards;
+  for (auto s = 0; s < num_shards; ++s) {
+    shards_.emplace_back(new SLRUCacheShardPair(mem_tracker_.get(), 
per_probationary_shard,
+                                                per_protected_shard, lookups));
+  }
+}
+
+Cache::UniquePendingHandle ShardedSLRUCache::Allocate(Slice key, int val_len, 
int charge) {
+  int key_len = key.size();
+  DCHECK_GE(key_len, 0);
+  DCHECK_GE(val_len, 0);
+  int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
+  UniquePendingHandle h(reinterpret_cast<PendingHandle*>(
+                            new uint8_t[sizeof(SLRUHandle)
+                                + key_len_padded + val_len // the kv_data VLA 
data
+                                - 1 // (the VLA has a 1-byte placeholder)
+                            ]),
+                        PendingHandleDeleter(this));
+  SLRUHandle* handle = reinterpret_cast<SLRUHandle*>(h.get());
+  handle->lookups = 0;
+  handle->in_protected_segment.store(false, std::memory_order_relaxed);
+  handle->key_length = key_len;
+  handle->val_length = val_len;
+  // TODO(KUDU-1091): account for the footprint of structures used by Cache's
+  //                  internal housekeeping (RL handles, etc.) in case of
+  //                  non-automatic charge.
+  handle->charge = (charge == kAutomaticCharge) ? 
kudu_malloc_usable_size(h.get()) : charge;
+  handle->hash = HashSlice(key);
+  memcpy(handle->kv_data, key.data(), key_len);
+
+  return h;
+}
+
+Cache::UniqueHandle ShardedSLRUCache::Lookup(const Slice& key, CacheBehavior 
caching) {
+  const uint32_t hash = HashSlice(key);
+  return UniqueHandle(
+      shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE),
+      HandleDeleter(this));
+}
+
+void ShardedSLRUCache::Erase(const Slice& key)  {
+  const uint32_t hash = HashSlice(key);
+  shards_[Shard(hash)]->Erase(key, hash);
+}
+
+Slice ShardedSLRUCache::Value(const UniqueHandle& handle) const {
+  return reinterpret_cast<const SLRUHandle*>(handle.get())->value();
+}
+
+uint8_t* ShardedSLRUCache::MutableValue(UniquePendingHandle* handle) {
+  return reinterpret_cast<SLRUHandle*>(handle->get())->mutable_val_ptr();
+}
+
+Cache::UniqueHandle ShardedSLRUCache::Insert(UniquePendingHandle handle,
+                                             EvictionCallback* 
eviction_callback)  {
+  SLRUHandle* h = 
reinterpret_cast<SLRUHandle*>(DCHECK_NOTNULL(handle.release()));
+  return UniqueHandle(shards_[Shard(h->hash)]->Insert(h, eviction_callback),
+                      HandleDeleter(this));
+}
+
+void ShardedSLRUCache::Release(Handle* handle) {
+  SLRUHandle* h = reinterpret_cast<SLRUHandle*>(handle);
+  shards_[Shard(h->hash)]->Release(handle);
+}
+
+void ShardedSLRUCache::Free(PendingHandle* h) {
+  delete [] reinterpret_cast<uint8_t*>(h);
+}
+
+// Determine the number of bits of the hash that should be used to determine
+// the cache shard. This, in turn, determines the number of shards.
+int ShardedSLRUCache::DetermineShardBits() {
+  int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
+             0 : Bits::Log2Ceiling(base::NumCPUs());
+  VLOG(1) << "Will use " << (1 << bits) << " shards for SLRU cache.";
+  return bits;
+}
+
+uint32_t ShardedSLRUCache::HashSlice(const Slice& s) {
+  return util_hash::CityHash64(reinterpret_cast<const char *>(s.data()), 
s.size());
+}
+
+uint32_t ShardedSLRUCache::Shard(uint32_t hash) const {
+  // Widen to uint64 before shifting, or else on a single CPU,
+  // we would try to shift a uint32_t by 32 bits, which is undefined.
+  return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
+}
+
+template<>
+ShardedSLRUCache* NewSLRUCache<Cache::MemoryType::DRAM>(size_t 
probationary_capacity,
+                                                        size_t 
protected_capacity,
+                                                        const std::string& id,
+                                                        const uint32_t 
lookups) {
+  return new ShardedSLRUCache(probationary_capacity, protected_capacity, id, 
lookups);
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/slru_cache.h b/src/kudu/util/slru_cache.h
new file mode 100644
index 000000000..aab328245
--- /dev/null
+++ b/src/kudu/util/slru_cache.h
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/alignment.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/slice.h"
+
+using Handle = kudu::Cache::Handle;
+using EvictionCallback = kudu::Cache::EvictionCallback;
+
+namespace kudu {
+// An SLRU cache contains two internal caches, the probationary and protected 
segments both with
+// their own separate configurable capacities. An SLRU cache is scan resistant 
as it protects the
+// cache from infrequent long scans larger than the cache or a long series of 
small infrequent scans
+// close to the scan's capacity. It does this by promoting entries from the 
probationary segment to
+// the protected segment after a configurable amount of lookups. Any random 
scan would then only
+// evict entries from the probationary segment. Any entries evicted from the 
protected segment as a
+// result of upgrade of entries would then be inserted into the MRU end of the 
probationary segment.
+
+class MemTracker;
+struct CacheMetrics;
+
+struct SLRUHandle {
+  EvictionCallback* eviction_callback;
+  SLRUHandle* next_hash;
+  SLRUHandle* next;
+  SLRUHandle* prev;
+  size_t charge;      // TODO(opt): Only allow uint32_t?
+  uint32_t key_length;
+  uint32_t val_length;
+  std::atomic<int32_t> refs;
+  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
+
+  // Number of lookups for this entry. Used for upgrading to protected segment 
in SLRU cache.
+  // Resets to 0 when moved between probationary and protected segments in 
both directions.
+  uint32_t lookups;
+
+  // True if entry is in protected segment, false if not.
+  // Used for releasing from the right shard in the SLRU cache implementation.
+  std::atomic<bool> in_protected_segment;
+
+  // The storage for the key/value pair itself. The data is stored as:
+  //   [key bytes ...] [padding up to 8-byte boundary] [value bytes ...]
+  uint8_t kv_data[1];   // Beginning of key/value pair
+
+  Slice key() const {
+    return Slice(kv_data, key_length);
+  }
+
+  uint8_t* mutable_val_ptr() {
+    int val_offset = KUDU_ALIGN_UP(key_length, sizeof(void*));
+    return &kv_data[val_offset];
+  }
+
+  const uint8_t* val_ptr() const {
+    return const_cast<SLRUHandle*>(this)->mutable_val_ptr();
+  }
+
+  Slice value() const {
+    return Slice(val_ptr(), val_length);
+  }
+
+  // Clears fields when moving between probationary and protected segments.
+  // Used in SLRU cache implementation.
+  void Sanitize() {
+    lookups = 0;
+    prev = nullptr;
+    next = nullptr;
+    next_hash = nullptr;
+  }
+};
+// TODO(mreddy): Add an extra layer of inheritance to SLRUCacheShard for 
ProtectedShard and
+// ProbationaryShard so segment-specific methods can be divided accordingly.
+// A single shard of sharded SLRU cache.
+class SLRUCacheShard {
+ public:
+  SLRUCacheShard(MemTracker* tracker, size_t capacity);
+  ~SLRUCacheShard();
+
+  size_t capacity() const {
+    return capacity_;
+  }
+
+  // Inserts handle into shard and removes any entries if past capacity.
+  Handle* Insert(SLRUHandle* handle, EvictionCallback* eviction_callback);
+  // Same as Insert but also returns any evicted entries.
+  // Used when upgrading entry to protected segment.
+  std::vector<SLRUHandle*> InsertAndReturnEvicted(SLRUHandle* handle);
+  // Same as InsertAndReturnEvicted but it returns the inserted handle too.
+  // Used for upsert case in protected segment.
+  Handle* ProtectedInsert(SLRUHandle* handle,
+                          EvictionCallback* eviction_callback,
+                          std::vector<SLRUHandle*>* evictions);
+  // Like SLRUCache::Lookup, but with an extra "hash" parameter.
+  Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
+  // Reduces the entry's ref by one, frees the entry if no refs are remaining.
+  void Release(Handle* handle);
+  // Removes entry from shard, frees the entry if no refs are remaining.
+  void Erase(const Slice& key, uint32_t hash);
+  // Like Erase, but underlying entry is not freed.
+  // Necessary when upgrading entry to protected segment.
+  void SoftErase(const Slice& key, uint32_t hash);
+  // Returns true if shard contains entry, false if not.
+  bool Contains(const Slice& key, uint32_t hash);
+  // Like Insert but sets refs to 1 and no possibility for upsert case.
+  // Used when evicted entries from protected segment are being added to 
probationary segment.
+  void ReInsert(SLRUHandle* handle);
+
+ private:
+  void RL_Remove(SLRUHandle* e);
+  void RL_Append(SLRUHandle* e);
+  // Update the recency list after a lookup operation.
+  void RL_UpdateAfterLookup(SLRUHandle* e);
+  // Just reduce the reference count by 1.
+  // Return true if last reference
+  static bool Unref(SLRUHandle* e);
+  // Call the user's eviction callback, if it exists, and free the entry.
+  void FreeEntry(SLRUHandle* e);
+  // Updates memtracker to reflect entry being erased from cache.
+  // Unlike FreeEntry(), the eviction callback is not called and the entry is 
not freed.
+  void SoftFreeEntry(SLRUHandle* e);
+
+
+  // Update the memtracker's consumption by the given amount.
+  //
+  // This "buffers" the updates locally in 'deferred_consumption_' until the 
amount
+  // of accumulated delta is more than ~1% of the cache capacity. This improves
+  // performance under workloads with high eviction rates for a few reasons:
+  //
+  // 1) once the cache reaches its full capacity, we expect it to remain there
+  // in steady state. Each insertion is usually matched by an eviction, and 
unless
+  // the total size of the evicted item(s) is much different than the size of 
the
+  // inserted item, each eviction event is unlikely to change the total cache 
usage
+  // much. So, we expect that the accumulated error will mostly remain around 0
+  // and we can avoid propagating changes to the MemTracker at all.
+  //
+  // 2) because the cache implementation is sharded, we do this tracking in a 
bunch
+  // of different locations, avoiding bouncing cache-lines between cores. By 
contrast
+  // the MemTracker is a simple integer, so it doesn't scale as well under 
concurrency.
+  //
+  // Positive delta indicates an increased memory consumption.
+  void UpdateMemTracker(int64_t delta);
+
+  // Initialized before use.
+  const size_t capacity_;
+
+  size_t usage_;
+
+  // Dummy head of recency list.
+  // rl.prev is newest entry, rl.next is oldest entry.
+  SLRUHandle rl_;
+
+  Cache::HandleTable<SLRUHandle> table_;
+
+  MemTracker* mem_tracker_;
+  std::atomic<int64_t> deferred_consumption_ { 0 };
+
+  // Initialized based on capacity_ to ensure an upper bound on the error on 
the
+  // MemTracker consumption.
+  int64_t max_deferred_consumption_;
+
+  DISALLOW_COPY_AND_ASSIGN(SLRUCacheShard);
+};
+
+// Wrapper class for SLRUCacheShard.
+class SLRUCacheShardPair {
+ public:
+  SLRUCacheShardPair(MemTracker* mem_tracker,
+                     size_t probationary_capacity,
+                     size_t protected_capacity,
+                     uint32_t lookups);
+
+  Handle* Insert(SLRUHandle* handle, EvictionCallback* eviction_callback);
+  // Like Cache::Lookup, but with an extra "hash" parameter.
+  Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
+  void Release(Handle* handle);
+  void Erase(const Slice& key, uint32_t hash);
+  bool ProbationaryContains(const Slice& key, uint32_t hash);
+  bool ProtectedContains(const Slice& key, uint32_t hash);
+
+ private:
+  SLRUCacheShard probationary_shard_;
+  SLRUCacheShard protected_shard_;
+
+  // If an entry is looked up at least 'lookups_threshold_' times,
+  // it's upgraded to the protected segment.
+  const uint32_t lookups_threshold_;
+
+  // mutex_ protects the state of the shard pair from other threads 
reading/modifying the data.
+  simple_spinlock mutex_;
+
+  DISALLOW_COPY_AND_ASSIGN(SLRUCacheShardPair);
+};
+
+// TODO(mreddy): Remove duplicated code between ShardedSLRUCache and 
ShardedCache in cache.cc
+class ShardedSLRUCache : public Cache {
+ public:
+  explicit ShardedSLRUCache(size_t probationary_capacity, size_t 
protected_capacity,
+                            const std::string& id, const uint32_t lookups);
+
+  ~ShardedSLRUCache() override = default;
+
+  UniquePendingHandle Allocate(Slice key, int val_len, int charge) override;
+
+  UniqueHandle Lookup(const Slice& key, CacheBehavior caching) override;
+
+  void Erase(const Slice& key) override;
+
+  Slice Value(const UniqueHandle& handle) const override;
+
+  uint8_t* MutableValue(UniquePendingHandle* handle) override;
+
+  UniqueHandle Insert(UniquePendingHandle handle,
+                      EvictionCallback* eviction_callback) override;
+
+  void SetMetrics(std::unique_ptr<CacheMetrics> metrics,
+                  ExistingMetricsPolicy metrics_policy) override { }
+
+  size_t Invalidate(const InvalidationControl& ctl) override { return 0; }
+
+ protected:
+  void Release(Handle* handle) override;
+
+  void Free(PendingHandle* h) override;
+
+ private:
+  friend class SLRUCacheBaseTest;
+  static int DetermineShardBits();
+
+  static uint32_t HashSlice(const Slice& s);
+
+  uint32_t Shard(uint32_t hash) const;
+
+  // Needs to be declared before 'shards_' so the destructor for 'shards_' is 
called first.
+  // The destructor for 'mem_tracker_' checks that all the memory consumed has 
been released.
+  // The destructor for 'shards_' must be called first since it releases all 
the memory consumed.
+  std::shared_ptr<MemTracker> mem_tracker_;
+
+  // The first shard in the pair belongs to the probationary segment, second 
one to the protected.
+  std::vector<std::unique_ptr<SLRUCacheShardPair>> shards_;
+
+  // Number of bits of hash used to determine the shard.
+  const int shard_bits_;
+};
+
+// Creates a new SLRU cache with 'probationary_capacity' being the capacity of
+// the probationary segment in bytes, 'protected_capacity' being the capacity 
of the
+// protected segment in bytes, 'id' specifying the identifier, and 'lookups' 
specifying
+// the number of times an entry can be looked up in the probationary segment
+// before being upgraded to the protected segment.
+template <Cache::MemoryType memory_type = Cache::MemoryType::DRAM>
+ShardedSLRUCache* NewSLRUCache(size_t probationary_capacity, size_t 
protected_capacity,
+                               const std::string& id, uint32_t lookups);
+
+} // namespace kudu

Reply via email to