This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 4888f80c refactor: partitioned_lock's elaboration (#1540)
4888f80c is described below
commit 4888f80c537c8d65d771abf16f384b743972ab2b
Author: Hugh Chern <[email protected]>
AuthorDate: Tue Jul 9 14:56:08 2024 +0800
refactor: partitioned_lock's elaboration (#1540)
## Rationale
Extended the `try_new` interface while keeping the old one for
compatibility.
## Detailed Changes
* Implemented the `try_new_suggest_cap` method, while changing the old
`try_new` method to `try_new_bit_len` to ensure compatibility.
* Modified structs and functions that call old interfaces.
## Test Plan
* Added new unit tests
* Passed CI test
---------
Co-authored-by: chunhao.ch <[email protected]>
---
src/components/object_store/src/disk_cache.rs | 4 +-
src/components/object_store/src/mem_cache.rs | 2 +-
src/components/partitioned_lock/src/lib.rs | 216 +++++++++++++++++++++-----
3 files changed, 182 insertions(+), 40 deletions(-)
diff --git a/src/components/object_store/src/disk_cache.rs
b/src/components/object_store/src/disk_cache.rs
index 981e6d06..b0c7ba13 100644
--- a/src/components/object_store/src/disk_cache.rs
+++ b/src/components/object_store/src/disk_cache.rs
@@ -296,7 +296,7 @@ impl DiskCache {
Ok(Self {
root_dir,
- meta_cache: Arc::new(PartitionedMutex::try_new(
+ meta_cache: Arc::new(PartitionedMutex::try_new_with_bit_len(
init_lru,
partition_bits,
SeaHasherBuilder {},
@@ -545,7 +545,7 @@ impl DiskCacheStore {
assert!(cap_per_part > 0);
Ok(LruCache::new(cap_per_part))
};
- let meta_cache = PartitionedMutex::try_new(
+ let meta_cache = PartitionedMutex::try_new_with_bit_len(
init_size_lru,
FILE_SIZE_CACHE_PARTITION_BITS,
SeaHasherBuilder,
diff --git a/src/components/object_store/src/mem_cache.rs
b/src/components/object_store/src/mem_cache.rs
index 001be2ab..f602eee6 100644
--- a/src/components/object_store/src/mem_cache.rs
+++ b/src/components/object_store/src/mem_cache.rs
@@ -81,7 +81,7 @@ impl MemCache {
))
};
- let inner = PartitionedMutex::try_new(
+ let inner = PartitionedMutex::try_new_with_bit_len(
init_lru,
partition_bits,
build_fixed_seed_ahasher_builder(),
diff --git a/src/components/partitioned_lock/src/lib.rs
b/src/components/partitioned_lock/src/lib.rs
index de7ba345..22273b97 100644
--- a/src/components/partitioned_lock/src/lib.rs
+++ b/src/components/partitioned_lock/src/lib.rs
@@ -36,20 +36,30 @@ impl<T, B> PartitionedRwLock<T, B>
where
B: BuildHasher,
{
- pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) ->
Result<Self, E>
+ /// New cache with capacity set to `2^bit_len`
+ pub fn try_new_with_bit_len<F, E>(
+ init_fn: F,
+ partition_bit_len: usize,
+ hash_builder: B,
+ ) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
- let partition_num = 1 << partition_bit;
- let partitions = (1..partition_num)
- .map(|_| init_fn(partition_num).map(RwLock::new))
- .collect::<Result<Vec<RwLock<T>>, E>>()?;
+ let partition_num = 1 << partition_bit_len;
+ PartitionedRwLock::try_new(init_fn, partition_num, hash_builder)
+ }
- Ok(Self {
- partitions,
- partition_mask: partition_num - 1,
- hash_builder,
- })
+ /// New cache with capacity round to `suggest_cap`'s power of 2
+ pub fn try_new_with_suggest_cap<F, E>(
+ init_fn: F,
+ suggest_cap: usize,
+ hash_builder: B,
+ ) -> Result<Self, E>
+ where
+ F: Fn(usize) -> Result<T, E>,
+ {
+ let partition_num = suggest_cap.next_power_of_two();
+ PartitionedRwLock::try_new(init_fn, partition_num, hash_builder)
}
pub fn read<K: Eq + Hash>(&self, key: &K) -> RwLockReadGuard<'_, T> {
@@ -68,6 +78,22 @@ where
&self.partitions[(self.hash_builder.hash_one(key) as usize) &
self.partition_mask]
}
+ #[inline]
+ fn try_new<F, E>(init_fn: F, partition_num: usize, hash_builder: B) ->
Result<Self, E>
+ where
+ F: Fn(usize) -> Result<T, E>,
+ {
+ let partitions = (0..partition_num)
+ .map(|_| init_fn(partition_num).map(RwLock::new))
+ .collect::<Result<Vec<RwLock<T>>, E>>()?;
+
+ Ok(Self {
+ partitions,
+ partition_mask: partition_num - 1,
+ hash_builder,
+ })
+ }
+
#[cfg(test)]
fn get_partition_by_index(&self, index: usize) -> &RwLock<T> {
&self.partitions[index]
@@ -89,20 +115,30 @@ impl<T, B> PartitionedMutex<T, B>
where
B: BuildHasher,
{
- pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) ->
Result<Self, E>
+ /// New cache with capacity set to `2^bit_len`
+ pub fn try_new_with_bit_len<F, E>(
+ init_fn: F,
+ partition_bit_len: usize,
+ hash_builder: B,
+ ) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
- let partition_num = 1 << partition_bit;
- let partitions = (0..partition_num)
- .map(|_| init_fn(partition_num).map(Mutex::new))
- .collect::<Result<Vec<Mutex<T>>, E>>()?;
+ let partition_num = 1 << partition_bit_len;
+ PartitionedMutex::try_new(init_fn, partition_num, hash_builder)
+ }
- Ok(Self {
- partitions,
- partition_mask: partition_num - 1,
- hash_builder,
- })
+ /// New cache with capacity round to `suggest_cap`'s power of 2
+ pub fn try_new_with_suggest_cap<F, E>(
+ init_fn: F,
+ suggest_cap: usize,
+ hash_builder: B,
+ ) -> Result<Self, E>
+ where
+ F: Fn(usize) -> Result<T, E>,
+ {
+ let partition_num = suggest_cap.next_power_of_two();
+ PartitionedMutex::try_new(init_fn, partition_num, hash_builder)
}
pub fn lock<K: Eq + Hash>(&self, key: &K) -> MutexGuard<'_, T> {
@@ -115,6 +151,22 @@ where
&self.partitions[(self.hash_builder.hash_one(key) as usize) &
self.partition_mask]
}
+ #[inline]
+ fn try_new<F, E>(init_fn: F, partition_num: usize, hash_builder: B) ->
Result<Self, E>
+ where
+ F: Fn(usize) -> Result<T, E>,
+ {
+ let partitions = (0..partition_num)
+ .map(|_| init_fn(partition_num).map(Mutex::new))
+ .collect::<Result<Vec<Mutex<T>>, E>>()?;
+
+ Ok(Self {
+ partitions,
+ partition_mask: partition_num - 1,
+ hash_builder,
+ })
+ }
+
#[cfg(test)]
fn get_partition_by_index(&self, index: usize) -> &Mutex<T> {
&self.partitions[index]
@@ -140,11 +192,43 @@ impl<T, B> PartitionedMutexAsync<T, B>
where
B: BuildHasher,
{
- pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) ->
Result<Self, E>
+ /// New cache with capacity set to `2^bit_len`
+ pub fn try_new_with_bit_len<F, E>(
+ init_fn: F,
+ partition_bit_len: usize,
+ hash_builder: B,
+ ) -> Result<Self, E>
+ where
+ F: Fn(usize) -> Result<T, E>,
+ {
+ let partition_num = 1 << partition_bit_len;
+ PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder)
+ }
+
+ /// New cache with capacity round to `suggest_cap`'s power of 2
+ pub fn try_new_with_suggest_cap<F, E>(
+ init_fn: F,
+ suggest_cap: usize,
+ hash_builder: B,
+ ) -> Result<Self, E>
+ where
+ F: Fn(usize) -> Result<T, E>,
+ {
+ let partition_num = suggest_cap.next_power_of_two();
+ PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder)
+ }
+
+ pub async fn lock<K: Eq + Hash>(&self, key: &K) ->
tokio::sync::MutexGuard<'_, T> {
+ let mutex = self.get_partition(key);
+
+ mutex.lock().await
+ }
+
+ #[inline]
+ fn try_new<F, E>(init_fn: F, partition_num: usize, hash_builder: B) ->
Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
- let partition_num = 1 << partition_bit;
let partitions = (0..partition_num)
.map(|_| init_fn(partition_num).map(tokio::sync::Mutex::new))
.collect::<Result<Vec<tokio::sync::Mutex<T>>, E>>()?;
@@ -156,12 +240,6 @@ where
})
}
- pub async fn lock<K: Eq + Hash>(&self, key: &K) ->
tokio::sync::MutexGuard<'_, T> {
- let mutex = self.get_partition(key);
-
- mutex.lock().await
- }
-
fn get_partition<K: Eq + Hash>(&self, key: &K) -> &tokio::sync::Mutex<T> {
&self.partitions[(self.hash_builder.hash_one(key) as usize) &
self.partition_mask]
}
@@ -181,11 +259,66 @@ mod tests {
use super::*;
+ #[test]
+ fn test_new_equivalence() {
+ let init_42 = |_: usize| Ok::<_, ()>(42);
+
+ let test_rwlock_42_bit_len =
+ PartitionedRwLock::try_new_with_bit_len(init_42, 4,
build_fixed_seed_ahasher_builder())
+ .unwrap();
+ let test_rwlock_42_suggest_cap =
PartitionedRwLock::try_new_with_suggest_cap(
+ init_42,
+ 13,
+ build_fixed_seed_ahasher_builder(),
+ )
+ .unwrap();
+
+ let test_mutex_42_bit_len =
+ PartitionedMutex::try_new_with_bit_len(init_42, 4,
build_fixed_seed_ahasher_builder())
+ .unwrap();
+ let test_mutex_42_suggest_cap =
PartitionedMutex::try_new_with_suggest_cap(
+ init_42,
+ 16,
+ build_fixed_seed_ahasher_builder(),
+ )
+ .unwrap();
+
+ let test_mutex_async_42_bit_len =
PartitionedMutexAsync::try_new_with_bit_len(
+ init_42,
+ 4,
+ build_fixed_seed_ahasher_builder(),
+ )
+ .unwrap();
+ let test_mutex_async_42_suggest_cap =
PartitionedMutexAsync::try_new_with_suggest_cap(
+ init_42,
+ 13,
+ build_fixed_seed_ahasher_builder(),
+ )
+ .unwrap();
+
+ assert_eq!(
+ test_rwlock_42_bit_len.partition_mask,
+ test_rwlock_42_suggest_cap.partition_mask
+ );
+ assert_eq!(
+ test_mutex_42_bit_len.partition_mask,
+ test_mutex_42_suggest_cap.partition_mask
+ );
+ assert_eq!(
+ test_mutex_async_42_bit_len.partition_mask,
+ test_mutex_async_42_suggest_cap.partition_mask
+ );
+ }
+
#[test]
fn test_partitioned_rwlock() {
let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
- let test_locked_map =
- PartitionedRwLock::try_new(init_hmap, 4,
build_fixed_seed_ahasher_builder()).unwrap();
+ let test_locked_map = PartitionedRwLock::try_new_with_bit_len(
+ init_hmap,
+ 4,
+ build_fixed_seed_ahasher_builder(),
+ )
+ .unwrap();
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();
@@ -203,8 +336,12 @@ mod tests {
#[test]
fn test_partitioned_mutex() {
let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
- let test_locked_map =
- PartitionedMutex::try_new(init_hmap, 4,
build_fixed_seed_ahasher_builder()).unwrap();
+ let test_locked_map = PartitionedMutex::try_new_with_bit_len(
+ init_hmap,
+ 4,
+ build_fixed_seed_ahasher_builder(),
+ )
+ .unwrap();
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();
@@ -223,7 +360,7 @@ mod tests {
async fn test_partitioned_mutex_async() {
let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
let test_locked_map =
- PartitionedMutexAsync::try_new(init_hmap, 4,
SeaHasherBuilder).unwrap();
+ PartitionedMutexAsync::try_new_with_bit_len(init_hmap, 4,
SeaHasherBuilder).unwrap();
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();
@@ -242,7 +379,8 @@ mod tests {
fn test_partitioned_mutex_vis_different_partition() {
let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
let test_locked_map =
- PartitionedMutex::try_new(init_vec, 4,
build_fixed_seed_ahasher_builder()).unwrap();
+ PartitionedMutex::try_new_with_bit_len(init_vec, 4,
build_fixed_seed_ahasher_builder())
+ .unwrap();
let mutex_first = test_locked_map.get_partition_by_index(0);
let mut _tmp_data = mutex_first.lock().unwrap();
@@ -256,8 +394,12 @@ mod tests {
#[test]
fn test_partitioned_rwmutex_vis_different_partition() {
let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
- let test_locked_map =
- PartitionedRwLock::try_new(init_vec, 4,
build_fixed_seed_ahasher_builder()).unwrap();
+ let test_locked_map = PartitionedRwLock::try_new_with_bit_len(
+ init_vec,
+ 4,
+ build_fixed_seed_ahasher_builder(),
+ )
+ .unwrap();
let mutex_first = test_locked_map.get_partition_by_index(0);
let mut _tmp = mutex_first.write().unwrap();
assert!(mutex_first.try_write().is_err());
@@ -271,7 +413,7 @@ mod tests {
async fn test_partitioned_mutex_async_vis_different_partition() {
let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
let test_locked_map =
- PartitionedMutexAsync::try_new(init_vec, 4,
SeaHasherBuilder).unwrap();
+ PartitionedMutexAsync::try_new_with_bit_len(init_vec, 4,
SeaHasherBuilder).unwrap();
let mutex_first = test_locked_map.get_partition_by_index(0).await;
let mut _tmp_data = mutex_first.lock().await;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]