This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 5b6aa7f6b KUDU-613: Don't modify refs when moving entries
5b6aa7f6b is described below
commit 5b6aa7f6baac3eb4c14d9d349f81da5f5b8714c1
Author: Mahesh Reddy <[email protected]>
AuthorDate: Tue Nov 26 17:52:09 2024 -0800
KUDU-613: Don't modify refs when moving entries
The current implementation of the SLRU cache temporarily
changes the reference count of entries when moving between
segments of the cache (either during an upgrade or downgrade).
A temporary decrement of the handle's ref count can cause a
concurrent Release call to the entry to think it's the last
reference to the entry and free it when it shouldn't be.
This patch changes this behavior by not modifying the ref
count of entries when moving between segments. A reproduction
scenario to trigger the concurrency error is added to cache-bench.
The existing behavior is to update the mem tracker when moving
entries between segments but that's not necessary. This patch
changes that behavior by not updating the mem tracker for
entries moving between segments.
Change-Id: I643907612d43eba2c5f8dbc19d2f74f88bbca869
Reviewed-on: http://gerrit.cloudera.org:8080/22132
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Kudu Jenkins
---
src/kudu/util/cache-bench.cc | 91 ++++++++++++++++++++++++++++++-------------
src/kudu/util/slru_cache.cc | 93 +++++++++++++++++++-------------------------
src/kudu/util/slru_cache.h | 13 ++++---
3 files changed, 111 insertions(+), 86 deletions(-)
diff --git a/src/kudu/util/cache-bench.cc b/src/kudu/util/cache-bench.cc
index 813c54aba..980a2a7c5 100644
--- a/src/kudu/util/cache-bench.cc
+++ b/src/kudu/util/cache-bench.cc
@@ -25,7 +25,6 @@
#include <utility>
#include <vector>
-#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
@@ -41,9 +40,6 @@
#include "kudu/util/slru_cache.h"
#include "kudu/util/test_util.h"
-DEFINE_int32(num_threads, 16, "The number of threads to access the cache
concurrently.");
-DEFINE_int32(run_seconds, 1, "The number of seconds to run the benchmark");
-
using std::atomic;
using std::pair;
using std::string;
@@ -53,15 +49,7 @@ using std::vector;
namespace kudu {
-// Benchmark a 1GB cache.
-static constexpr int kCacheCapacity = 1024 * 1024 * 1024;
-static constexpr int kProbationarySegmentCapacity = 204 * 1024 * 1024;
-static constexpr int kProtectedSegmentCapacity = kCacheCapacity -
kProbationarySegmentCapacity;
static constexpr uint32_t kLookups = 2;
-static constexpr uint16_t kMaxMultiplier = 256;
-
-// Use 4kb entries.
-static constexpr int kEntrySize = 4 * 1024;
// Test parameterization.
struct BenchSetup {
@@ -95,9 +83,34 @@ struct BenchSetup {
Cache::EvictionPolicy eviction_policy;
+ // Default parameters for benchmark. 1GB cache with 4kb entries.
+ struct Params {
+ uint32_t num_threads = 16;
+ uint32_t num_seconds = 1;
+ int cache_capacity = 1024 * 1024 * 1024;
+ int probationary_segment_capacity = 204 * 1024 * 1024;
+ int protected_segment_capacity = cache_capacity -
probationary_segment_capacity;
+ int entry_size = 4 * 1024;
+ uint16_t max_multiplier = 256;
+ bool trigger_concurrency_error = false;
+ };
+ Params params;
+
+ // Reproduction scenario for concurrency error. This set of parameters
reduces the size of the
+ // cache and has the probationary and protected segment to be the same size.
The entry size
+ // is large enough compared to the segment capacity such that only two
entries can fit in each
+ // segment. With there only being a few entries, it's much more likely that
when moving entries
+ // between segments that a concurrent Release call will trigger the error
while the entry's ref
+ // count is temporarily decremented.
+ constexpr static Params kTriggerConcurrencyError
+ {2, 5, 1024 * 1024, 512 * 1024, 512 * 1024, 16 * 1024, 1, true};
+
string ToString() const {
string ret;
ret += ToString(pattern);
+ if (params.trigger_concurrency_error) {
+ ret += " Concurrency error reproduction";
+ }
if (eviction_policy == Cache::EvictionPolicy::SLRU) {
ret += " SLRU";
} else {
@@ -111,10 +124,11 @@ struct BenchSetup {
uint32_t max_key() const {
if (eviction_policy == Cache::EvictionPolicy::SLRU) {
return static_cast<int64_t>(
- (kProbationarySegmentCapacity + kProtectedSegmentCapacity) *
dataset_cache_ratio)
- / kEntrySize;
+ (params.probationary_segment_capacity +
params.protected_segment_capacity)
+ * dataset_cache_ratio)
+ / params.entry_size;
}
- return static_cast<int64_t>(kCacheCapacity * dataset_cache_ratio) /
kEntrySize;
+ return static_cast<int64_t>(params.cache_capacity * dataset_cache_ratio) /
params.entry_size;
}
};
@@ -123,12 +137,20 @@ class CacheBench : public KuduTest,
public:
void SetUp() override {
KuduTest::SetUp();
- const BenchSetup& setup = GetParam();
+ auto setup = GetParam();
if (setup.eviction_policy == Cache::EvictionPolicy::SLRU) {
- cache_.reset(NewSLRUCache(kProbationarySegmentCapacity,
kProtectedSegmentCapacity,
+ cache_.reset(NewSLRUCache(setup.params.probationary_segment_capacity,
+ setup.params.protected_segment_capacity,
"test-cache", kLookups));
+ // For the reproduction scenario, change entry size such that only two
entries fit per
+ // segment. Only is guaranteed when the probationary and protected
segments are the same size.
+ if (setup.params.trigger_concurrency_error) {
+ auto* slru_cache = dynamic_cast<ShardedSLRUCache*>(cache_.get());
+ setup.params.entry_size = setup.params.probationary_segment_capacity
+ / (slru_cache->shards_.size() * 2);
+ }
} else {
- cache_.reset(NewCache(kCacheCapacity, "test-cache"));
+ cache_.reset(NewCache(setup.params.cache_capacity, "test-cache"));
}
}
@@ -152,7 +174,7 @@ class CacheBench : public KuduTest,
break;
case BenchSetup::Pattern::PRE_DETERMINED_FREQUENT_LOOKUPS:
if (frequent) {
- auto small_multiplier = r.Uniform(kMaxMultiplier);
+ auto small_multiplier = r.Uniform(setup.params.max_multiplier);
int_key = large_number * small_multiplier;
} else {
// Rare random key with big value.
@@ -169,9 +191,9 @@ class CacheBench : public KuduTest,
if (h) {
++hits;
} else {
- int entry_size = kEntrySize;
+ int entry_size = setup.params.entry_size;
if (setup.pattern ==
BenchSetup::Pattern::PRE_DETERMINED_FREQUENT_LOOKUPS && !frequent) {
- entry_size = 10000 * kEntrySize;
+ entry_size = 10000 * entry_size;
}
auto ph(cache_->Allocate(
key_slice, /* val_len=*/entry_size, /* charge=*/entry_size));
@@ -215,38 +237,55 @@ class CacheBench : public KuduTest,
INSTANTIATE_TEST_SUITE_P(Patterns, CacheBench,
testing::ValuesIn(std::vector<BenchSetup>{
{BenchSetup::Pattern::ZIPFIAN, 1.0, Cache::EvictionPolicy::LRU},
{BenchSetup::Pattern::ZIPFIAN, 1.0, Cache::EvictionPolicy::SLRU},
+ {BenchSetup::Pattern::ZIPFIAN, 1.0, Cache::EvictionPolicy::SLRU,
+ BenchSetup::kTriggerConcurrencyError},
{BenchSetup::Pattern::ZIPFIAN, 3.0, Cache::EvictionPolicy::LRU},
{BenchSetup::Pattern::ZIPFIAN, 3.0, Cache::EvictionPolicy::SLRU},
+ {BenchSetup::Pattern::ZIPFIAN, 3.0, Cache::EvictionPolicy::SLRU,
+ BenchSetup::kTriggerConcurrencyError},
{BenchSetup::Pattern::UNIFORM, 1.0, Cache::EvictionPolicy::LRU},
{BenchSetup::Pattern::UNIFORM, 1.0, Cache::EvictionPolicy::SLRU},
+ {BenchSetup::Pattern::UNIFORM, 1.0, Cache::EvictionPolicy::SLRU,
+ BenchSetup::kTriggerConcurrencyError},
{BenchSetup::Pattern::UNIFORM, 3.0, Cache::EvictionPolicy::LRU},
{BenchSetup::Pattern::UNIFORM, 3.0, Cache::EvictionPolicy::SLRU},
+ {BenchSetup::Pattern::UNIFORM, 3.0, Cache::EvictionPolicy::SLRU,
+ BenchSetup::kTriggerConcurrencyError},
{BenchSetup::Pattern::PRE_DETERMINED_FREQUENT_LOOKUPS, 1.0,
Cache::EvictionPolicy::LRU},
{BenchSetup::Pattern::PRE_DETERMINED_FREQUENT_LOOKUPS, 1.0,
Cache::EvictionPolicy::SLRU},
+ {BenchSetup::Pattern::PRE_DETERMINED_FREQUENT_LOOKUPS, 1.0,
Cache::EvictionPolicy::SLRU,
+ BenchSetup::kTriggerConcurrencyError},
{BenchSetup::Pattern::PRE_DETERMINED_FREQUENT_LOOKUPS, 3.0,
Cache::EvictionPolicy::LRU},
- {BenchSetup::Pattern::PRE_DETERMINED_FREQUENT_LOOKUPS, 3.0,
Cache::EvictionPolicy::SLRU}
+ {BenchSetup::Pattern::PRE_DETERMINED_FREQUENT_LOOKUPS, 3.0,
Cache::EvictionPolicy::SLRU},
+ {BenchSetup::Pattern::PRE_DETERMINED_FREQUENT_LOOKUPS, 3.0,
Cache::EvictionPolicy::SLRU,
+ BenchSetup::kTriggerConcurrencyError}
}));
TEST_P(CacheBench, RunBench) {
const BenchSetup& setup = GetParam();
+ if (setup.params.trigger_concurrency_error) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+ }
+
Random r(GetRandomSeed32());
- uint32_t large_number_max = setup.max_key() / kMaxMultiplier;
+ uint32_t large_number_max = setup.max_key() / setup.params.max_multiplier;
uint32_t large_number = r.Uniform(large_number_max);
// Run a short warmup phase to try to populate the cache. Otherwise, even if
the
// dataset is smaller than the cache capacity, we would count a bunch of
misses
// during the warm-up phase.
LOG(INFO) << "Warming up...";
- RunQueryThreads(FLAGS_num_threads, 1, large_number);
+ RunQueryThreads(setup.params.num_threads, 1, large_number);
LOG(INFO) << "Running benchmark...";
- pair<int64_t, int64_t> hits_lookups = RunQueryThreads(FLAGS_num_threads,
FLAGS_run_seconds,
+ pair<int64_t, int64_t> hits_lookups =
RunQueryThreads(setup.params.num_threads,
+
setup.params.num_seconds,
large_number);
int64_t hits = hits_lookups.first;
int64_t lookups = hits_lookups.second;
- int64_t l_per_sec = lookups / FLAGS_run_seconds;
+ int64_t l_per_sec = lookups / setup.params.num_seconds;
double hit_rate = static_cast<double>(hits) / lookups;
string test_case = setup.ToString();
LOG(INFO) << test_case << ": " << HumanReadableNum::ToString(l_per_sec) << "
lookups/sec";
diff --git a/src/kudu/util/slru_cache.cc b/src/kudu/util/slru_cache.cc
index bdd5dff06..d6184453c 100644
--- a/src/kudu/util/slru_cache.cc
+++ b/src/kudu/util/slru_cache.cc
@@ -98,14 +98,6 @@ void SLRUCacheShard<segment>::FreeEntry(SLRUHandle* e) {
delete [] e;
}
-template<Segment segment>
-void SLRUCacheShard<segment>::SoftFreeEntry(SLRUHandle* e) {
- UpdateMemTracker(-static_cast<int64_t>(e->charge));
- if (PREDICT_TRUE(metrics_)) {
- UpdateMetricsEviction(e->charge);
- }
-}
-
template<>
void SLRUCacheShard<Segment::kProbationary>::UpdateMetricsEviction(size_t
charge) {
metrics_->probationary_segment_cache_usage->DecrementBy(charge);
@@ -245,6 +237,31 @@ void SLRUCacheShard<segment>::Release(Handle* handle) {
}
}
+template<Segment segment>
+void SLRUCacheShard<segment>::RemoveEntriesPastCapacity() {
+ while (usage_ > capacity_ && rl_.next != &rl_) {
+ SLRUHandle* old = rl_.next;
+ RL_Remove(old);
+ table_.Remove(old->key(), old->hash);
+ if (Unref(old)) {
+ FreeEntry(old);
+ }
+ }
+}
+
+template<Segment segment>
+void
SLRUCacheShard<segment>::SoftRemoveEntriesPastCapacity(vector<SLRUHandle*>*
evicted_entries) {
+ while (usage_ > capacity_ && rl_.next != &rl_) {
+ SLRUHandle* old = rl_.next;
+ RL_Remove(old);
+ table_.Remove(old->key(), old->hash);
+ if (PREDICT_TRUE(metrics_)) {
+ UpdateMetricsEviction(old->charge);
+ }
+ evicted_entries->emplace_back(old);
+ }
+}
+
template<>
Handle* SLRUCacheShard<Segment::kProbationary>::Insert(SLRUHandle* handle,
EvictionCallback*
eviction_callback) {
@@ -262,21 +279,14 @@ Handle*
SLRUCacheShard<Segment::kProbationary>::Insert(SLRUHandle* handle,
RL_Append(handle);
SLRUHandle* old_entry = table_.Insert(handle);
+ // If entry with key already exists, remove it.
if (old_entry != nullptr) {
RL_Remove(old_entry);
if (Unref(old_entry)) {
FreeEntry(old_entry);
}
}
-
- while (usage_ > capacity_ && rl_.next != &rl_) {
- SLRUHandle* old = rl_.next;
- RL_Remove(old);
- table_.Remove(old->key(), old->hash);
- if (Unref(old)) {
- FreeEntry(old);
- }
- }
+ RemoveEntriesPastCapacity();
return reinterpret_cast<Handle*>(handle);
}
@@ -284,16 +294,13 @@ Handle*
SLRUCacheShard<Segment::kProbationary>::Insert(SLRUHandle* handle,
template<>
vector<SLRUHandle*>
SLRUCacheShard<Segment::kProtected>::InsertAndReturnEvicted(
SLRUHandle* handle) {
- handle->refs.fetch_add(1, std::memory_order_relaxed);
handle->in_protected_segment.store(true, std::memory_order_relaxed);
- UpdateMemTracker(handle->charge);
if (PREDICT_TRUE(metrics_)) {
metrics_->upgrades->Increment();
metrics_->protected_segment_cache_usage->IncrementBy(handle->charge);
metrics_->protected_segment_inserts->Increment();
}
- vector<SLRUHandle*> evicted_entries;
handle->Sanitize();
RL_Append(handle);
@@ -301,15 +308,8 @@ vector<SLRUHandle*>
SLRUCacheShard<Segment::kProtected>::InsertAndReturnEvicted(
SLRUHandle* old_entry = table_.Insert(handle);
DCHECK(old_entry == nullptr);
- while (usage_ > capacity_ && rl_.next != &rl_) {
- SLRUHandle* old = rl_.next;
- RL_Remove(old);
- table_.Remove(old->key(), old->hash);
- Unref(old);
- SoftFreeEntry(old);
- evicted_entries.emplace_back(old);
- }
-
+ vector<SLRUHandle*> evicted_entries;
+ SoftRemoveEntriesPastCapacity(&evicted_entries);
return evicted_entries;
}
@@ -318,6 +318,9 @@ Handle*
SLRUCacheShard<Segment::kProtected>::ProtectedInsert(SLRUHandle* handle,
EvictionCallback*
eviction_callback,
vector<SLRUHandle*>* evictions) {
handle->eviction_callback = eviction_callback;
+ // Two refs for the handle: one from SLRUCacheShard, one for the returned
handle.
+ // Even though this function is for updates in the protected segment, it's
treated similarly
+ // to Insert() in the probationary segment.
handle->refs.store(2, std::memory_order_relaxed);
handle->in_protected_segment.store(true, std::memory_order_relaxed);
UpdateMemTracker(handle->charge);
@@ -330,7 +333,7 @@ Handle*
SLRUCacheShard<Segment::kProtected>::ProtectedInsert(SLRUHandle* handle,
RL_Append(handle);
- // Upsert case so Insert should return a non-null entry.
+ // Update case so Insert should return a non-null entry.
SLRUHandle* old_entry = table_.Insert(handle);
DCHECK(old_entry != nullptr);
RL_Remove(old_entry);
@@ -338,44 +341,25 @@ Handle*
SLRUCacheShard<Segment::kProtected>::ProtectedInsert(SLRUHandle* handle,
FreeEntry(old_entry);
}
- while (usage_ > capacity_ && rl_.next != &rl_) {
- SLRUHandle* old = rl_.next;
- RL_Remove(old);
- table_.Remove(old->key(), old->hash);
- Unref(old);
- SoftFreeEntry(old);
- evictions->emplace_back(old);
- }
-
+ SoftRemoveEntriesPastCapacity(evictions);
return reinterpret_cast<Handle*>(handle);
}
template<>
void SLRUCacheShard<Segment::kProbationary>::ReInsert(SLRUHandle* handle) {
- handle->refs.fetch_add(1, std::memory_order_relaxed);
handle->in_protected_segment.store(false, std::memory_order_relaxed);
- UpdateMemTracker(handle->charge);
if (PREDICT_TRUE(metrics_)) {
metrics_->downgrades->Increment();
metrics_->probationary_segment_cache_usage->IncrementBy(handle->charge);
metrics_->probationary_segment_inserts->Increment();
}
-
handle->Sanitize();
RL_Append(handle);
// No entries should exist with same key in probationary segment when
downgrading.
SLRUHandle* old_entry = table_.Insert(handle);
DCHECK(old_entry == nullptr);
-
- while (usage_ > capacity_ && rl_.next != &rl_) {
- SLRUHandle* old = rl_.next;
- RL_Remove(old);
- table_.Remove(old->key(), old->hash);
- if (Unref(old)) {
- FreeEntry(old);
- }
- }
+ RemoveEntriesPastCapacity();
}
template<Segment segment>
@@ -395,8 +379,9 @@ void SLRUCacheShard<segment>::SoftErase(const Slice& key,
uint32_t hash) {
SLRUHandle* e = table_.Remove(key, hash);
if (e != nullptr) {
RL_Remove(e);
- Unref(e);
- SoftFreeEntry(e);
+ if (PREDICT_TRUE(metrics_)) {
+ UpdateMetricsEviction(e->charge);
+ }
}
}
@@ -420,7 +405,7 @@ void SLRUCacheShardPair::SetMetrics(SLRUCacheMetrics*
metrics) {
// Commit a prepared entry into the probationary segment if entry does not
exist or if it
// exists in the probationary segment (upsert case).
-// If entry exists in protected segment, entry will be upserted and any
evicted entries will
+// If entry exists in protected segment, entry will be updated and any evicted
entries will
// be properly downgraded to the probationary segment.
// Look at Cache::Insert() for more details.
Handle* SLRUCacheShardPair::Insert(SLRUHandle* handle,
diff --git a/src/kudu/util/slru_cache.h b/src/kudu/util/slru_cache.h
index e1f8e97cb..3aca5e0fb 100644
--- a/src/kudu/util/slru_cache.h
+++ b/src/kudu/util/slru_cache.h
@@ -121,7 +121,7 @@ class SLRUCacheShard {
// Used when upgrading entry to protected segment.
std::vector<SLRUHandle*> InsertAndReturnEvicted(SLRUHandle* handle);
// Same as InsertAndReturnEvicted but it returns the inserted handle too.
- // Used for upsert case in protected segment.
+ // Used for update case in protected segment.
Handle* ProtectedInsert(SLRUHandle* handle,
EvictionCallback* eviction_callback,
std::vector<SLRUHandle*>* evictions);
@@ -136,7 +136,7 @@ class SLRUCacheShard {
void SoftErase(const Slice& key, uint32_t hash);
// Returns true if shard contains entry, false if not.
bool Contains(const Slice& key, uint32_t hash);
- // Like Insert but sets refs to 1 and no possibility for upsert case.
+ // Like Insert but sets refs to 1 and no possibility for update case.
// Used when evicted entries from protected segment are being added to
probationary segment.
void ReInsert(SLRUHandle* handle);
// Update the high-level metrics for a lookup operation.
@@ -154,12 +154,12 @@ class SLRUCacheShard {
static bool Unref(SLRUHandle* e);
// Call the user's eviction callback, if it exists, and free the entry.
void FreeEntry(SLRUHandle* e);
- // Updates memtracker to reflect entry being erased from cache.
- // Unlike FreeEntry(), the eviction callback is not called and the entry is
not freed.
- void SoftFreeEntry(SLRUHandle* e);
// Updates eviction related metrics.
void UpdateMetricsEviction(size_t charge);
-
+ // Removes any entries past capacity.
+ void RemoveEntriesPastCapacity();
+ // Adds any entries past capacity to a vector to be processed later.
+ void SoftRemoveEntriesPastCapacity(std::vector<SLRUHandle*>*
evicted_entries);
// Update the memtracker's consumption by the given amount.
//
@@ -269,6 +269,7 @@ class ShardedSLRUCache : public Cache {
private:
friend class SLRUCacheBaseTest;
+ friend class CacheBench;
FRIEND_TEST(SLRUCacheTest, EntriesArePinned);
static int DetermineShardBits();