This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4888f80c refactor: partitioned_lock's elaboration (#1540)
4888f80c is described below

commit 4888f80c537c8d65d771abf16f384b743972ab2b
Author: Hugh Chern <[email protected]>
AuthorDate: Tue Jul 9 14:56:08 2024 +0800

    refactor: partitioned_lock's elaboration (#1540)
    
    ## Rationale
    Extended the `try_new` interface while keeping the old one for
    compatibility.
    
    ## Detailed Changes
    * Implemented the `try_new_suggest_cap` method, while changing the old
    `try_new` method to `try_new_bit_len` to ensure compatibility.
    * Modified structs and functions that call old interfaces.
    
    ## Test Plan
    * Added new unit tests
    * Passed CI test
    
    ---------
    
    Co-authored-by: chunhao.ch <[email protected]>
---
 src/components/object_store/src/disk_cache.rs |   4 +-
 src/components/object_store/src/mem_cache.rs  |   2 +-
 src/components/partitioned_lock/src/lib.rs    | 216 +++++++++++++++++++++-----
 3 files changed, 182 insertions(+), 40 deletions(-)

diff --git a/src/components/object_store/src/disk_cache.rs 
b/src/components/object_store/src/disk_cache.rs
index 981e6d06..b0c7ba13 100644
--- a/src/components/object_store/src/disk_cache.rs
+++ b/src/components/object_store/src/disk_cache.rs
@@ -296,7 +296,7 @@ impl DiskCache {
 
         Ok(Self {
             root_dir,
-            meta_cache: Arc::new(PartitionedMutex::try_new(
+            meta_cache: Arc::new(PartitionedMutex::try_new_with_bit_len(
                 init_lru,
                 partition_bits,
                 SeaHasherBuilder {},
@@ -545,7 +545,7 @@ impl DiskCacheStore {
             assert!(cap_per_part > 0);
             Ok(LruCache::new(cap_per_part))
         };
-        let meta_cache = PartitionedMutex::try_new(
+        let meta_cache = PartitionedMutex::try_new_with_bit_len(
             init_size_lru,
             FILE_SIZE_CACHE_PARTITION_BITS,
             SeaHasherBuilder,
diff --git a/src/components/object_store/src/mem_cache.rs 
b/src/components/object_store/src/mem_cache.rs
index 001be2ab..f602eee6 100644
--- a/src/components/object_store/src/mem_cache.rs
+++ b/src/components/object_store/src/mem_cache.rs
@@ -81,7 +81,7 @@ impl MemCache {
             ))
         };
 
-        let inner = PartitionedMutex::try_new(
+        let inner = PartitionedMutex::try_new_with_bit_len(
             init_lru,
             partition_bits,
             build_fixed_seed_ahasher_builder(),
diff --git a/src/components/partitioned_lock/src/lib.rs 
b/src/components/partitioned_lock/src/lib.rs
index de7ba345..22273b97 100644
--- a/src/components/partitioned_lock/src/lib.rs
+++ b/src/components/partitioned_lock/src/lib.rs
@@ -36,20 +36,30 @@ impl<T, B> PartitionedRwLock<T, B>
 where
     B: BuildHasher,
 {
-    pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) -> 
Result<Self, E>
+    /// New cache with capacity set to `2^bit_len`
+    pub fn try_new_with_bit_len<F, E>(
+        init_fn: F,
+        partition_bit_len: usize,
+        hash_builder: B,
+    ) -> Result<Self, E>
     where
         F: Fn(usize) -> Result<T, E>,
     {
-        let partition_num = 1 << partition_bit;
-        let partitions = (1..partition_num)
-            .map(|_| init_fn(partition_num).map(RwLock::new))
-            .collect::<Result<Vec<RwLock<T>>, E>>()?;
+        let partition_num = 1 << partition_bit_len;
+        PartitionedRwLock::try_new(init_fn, partition_num, hash_builder)
+    }
 
-        Ok(Self {
-            partitions,
-            partition_mask: partition_num - 1,
-            hash_builder,
-        })
+    /// New cache with capacity round to `suggest_cap`'s power of 2
+    pub fn try_new_with_suggest_cap<F, E>(
+        init_fn: F,
+        suggest_cap: usize,
+        hash_builder: B,
+    ) -> Result<Self, E>
+    where
+        F: Fn(usize) -> Result<T, E>,
+    {
+        let partition_num = suggest_cap.next_power_of_two();
+        PartitionedRwLock::try_new(init_fn, partition_num, hash_builder)
     }
 
     pub fn read<K: Eq + Hash>(&self, key: &K) -> RwLockReadGuard<'_, T> {
@@ -68,6 +78,22 @@ where
         &self.partitions[(self.hash_builder.hash_one(key) as usize) & 
self.partition_mask]
     }
 
+    #[inline]
+    fn try_new<F, E>(init_fn: F, partition_num: usize, hash_builder: B) -> 
Result<Self, E>
+    where
+        F: Fn(usize) -> Result<T, E>,
+    {
+        let partitions = (0..partition_num)
+            .map(|_| init_fn(partition_num).map(RwLock::new))
+            .collect::<Result<Vec<RwLock<T>>, E>>()?;
+
+        Ok(Self {
+            partitions,
+            partition_mask: partition_num - 1,
+            hash_builder,
+        })
+    }
+
     #[cfg(test)]
     fn get_partition_by_index(&self, index: usize) -> &RwLock<T> {
         &self.partitions[index]
@@ -89,20 +115,30 @@ impl<T, B> PartitionedMutex<T, B>
 where
     B: BuildHasher,
 {
-    pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) -> 
Result<Self, E>
+    /// New cache with capacity set to `2^bit_len`
+    pub fn try_new_with_bit_len<F, E>(
+        init_fn: F,
+        partition_bit_len: usize,
+        hash_builder: B,
+    ) -> Result<Self, E>
     where
         F: Fn(usize) -> Result<T, E>,
     {
-        let partition_num = 1 << partition_bit;
-        let partitions = (0..partition_num)
-            .map(|_| init_fn(partition_num).map(Mutex::new))
-            .collect::<Result<Vec<Mutex<T>>, E>>()?;
+        let partition_num = 1 << partition_bit_len;
+        PartitionedMutex::try_new(init_fn, partition_num, hash_builder)
+    }
 
-        Ok(Self {
-            partitions,
-            partition_mask: partition_num - 1,
-            hash_builder,
-        })
+    /// New cache with capacity round to `suggest_cap`'s power of 2
+    pub fn try_new_with_suggest_cap<F, E>(
+        init_fn: F,
+        suggest_cap: usize,
+        hash_builder: B,
+    ) -> Result<Self, E>
+    where
+        F: Fn(usize) -> Result<T, E>,
+    {
+        let partition_num = suggest_cap.next_power_of_two();
+        PartitionedMutex::try_new(init_fn, partition_num, hash_builder)
     }
 
     pub fn lock<K: Eq + Hash>(&self, key: &K) -> MutexGuard<'_, T> {
@@ -115,6 +151,22 @@ where
         &self.partitions[(self.hash_builder.hash_one(key) as usize) & 
self.partition_mask]
     }
 
+    #[inline]
+    fn try_new<F, E>(init_fn: F, partition_num: usize, hash_builder: B) -> 
Result<Self, E>
+    where
+        F: Fn(usize) -> Result<T, E>,
+    {
+        let partitions = (0..partition_num)
+            .map(|_| init_fn(partition_num).map(Mutex::new))
+            .collect::<Result<Vec<Mutex<T>>, E>>()?;
+
+        Ok(Self {
+            partitions,
+            partition_mask: partition_num - 1,
+            hash_builder,
+        })
+    }
+
     #[cfg(test)]
     fn get_partition_by_index(&self, index: usize) -> &Mutex<T> {
         &self.partitions[index]
@@ -140,11 +192,43 @@ impl<T, B> PartitionedMutexAsync<T, B>
 where
     B: BuildHasher,
 {
-    pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) -> 
Result<Self, E>
+    /// New cache with capacity set to `2^bit_len`
+    pub fn try_new_with_bit_len<F, E>(
+        init_fn: F,
+        partition_bit_len: usize,
+        hash_builder: B,
+    ) -> Result<Self, E>
+    where
+        F: Fn(usize) -> Result<T, E>,
+    {
+        let partition_num = 1 << partition_bit_len;
+        PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder)
+    }
+
+    /// New cache with capacity round to `suggest_cap`'s power of 2
+    pub fn try_new_with_suggest_cap<F, E>(
+        init_fn: F,
+        suggest_cap: usize,
+        hash_builder: B,
+    ) -> Result<Self, E>
+    where
+        F: Fn(usize) -> Result<T, E>,
+    {
+        let partition_num = suggest_cap.next_power_of_two();
+        PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder)
+    }
+
+    pub async fn lock<K: Eq + Hash>(&self, key: &K) -> 
tokio::sync::MutexGuard<'_, T> {
+        let mutex = self.get_partition(key);
+
+        mutex.lock().await
+    }
+
+    #[inline]
+    fn try_new<F, E>(init_fn: F, partition_num: usize, hash_builder: B) -> 
Result<Self, E>
     where
         F: Fn(usize) -> Result<T, E>,
     {
-        let partition_num = 1 << partition_bit;
         let partitions = (0..partition_num)
             .map(|_| init_fn(partition_num).map(tokio::sync::Mutex::new))
             .collect::<Result<Vec<tokio::sync::Mutex<T>>, E>>()?;
@@ -156,12 +240,6 @@ where
         })
     }
 
-    pub async fn lock<K: Eq + Hash>(&self, key: &K) -> 
tokio::sync::MutexGuard<'_, T> {
-        let mutex = self.get_partition(key);
-
-        mutex.lock().await
-    }
-
     fn get_partition<K: Eq + Hash>(&self, key: &K) -> &tokio::sync::Mutex<T> {
         &self.partitions[(self.hash_builder.hash_one(key) as usize) & 
self.partition_mask]
     }
@@ -181,11 +259,66 @@ mod tests {
 
     use super::*;
 
+    #[test]
+    fn test_new_equivalence() {
+        let init_42 = |_: usize| Ok::<_, ()>(42);
+
+        let test_rwlock_42_bit_len =
+            PartitionedRwLock::try_new_with_bit_len(init_42, 4, 
build_fixed_seed_ahasher_builder())
+                .unwrap();
+        let test_rwlock_42_suggest_cap = 
PartitionedRwLock::try_new_with_suggest_cap(
+            init_42,
+            13,
+            build_fixed_seed_ahasher_builder(),
+        )
+        .unwrap();
+
+        let test_mutex_42_bit_len =
+            PartitionedMutex::try_new_with_bit_len(init_42, 4, 
build_fixed_seed_ahasher_builder())
+                .unwrap();
+        let test_mutex_42_suggest_cap = 
PartitionedMutex::try_new_with_suggest_cap(
+            init_42,
+            16,
+            build_fixed_seed_ahasher_builder(),
+        )
+        .unwrap();
+
+        let test_mutex_async_42_bit_len = 
PartitionedMutexAsync::try_new_with_bit_len(
+            init_42,
+            4,
+            build_fixed_seed_ahasher_builder(),
+        )
+        .unwrap();
+        let test_mutex_async_42_suggest_cap = 
PartitionedMutexAsync::try_new_with_suggest_cap(
+            init_42,
+            13,
+            build_fixed_seed_ahasher_builder(),
+        )
+        .unwrap();
+
+        assert_eq!(
+            test_rwlock_42_bit_len.partition_mask,
+            test_rwlock_42_suggest_cap.partition_mask
+        );
+        assert_eq!(
+            test_mutex_42_bit_len.partition_mask,
+            test_mutex_42_suggest_cap.partition_mask
+        );
+        assert_eq!(
+            test_mutex_async_42_bit_len.partition_mask,
+            test_mutex_async_42_suggest_cap.partition_mask
+        );
+    }
+
     #[test]
     fn test_partitioned_rwlock() {
         let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
-        let test_locked_map =
-            PartitionedRwLock::try_new(init_hmap, 4, 
build_fixed_seed_ahasher_builder()).unwrap();
+        let test_locked_map = PartitionedRwLock::try_new_with_bit_len(
+            init_hmap,
+            4,
+            build_fixed_seed_ahasher_builder(),
+        )
+        .unwrap();
         let test_key = "test_key".to_string();
         let test_value = "test_value".to_string();
 
@@ -203,8 +336,12 @@ mod tests {
     #[test]
     fn test_partitioned_mutex() {
         let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
-        let test_locked_map =
-            PartitionedMutex::try_new(init_hmap, 4, 
build_fixed_seed_ahasher_builder()).unwrap();
+        let test_locked_map = PartitionedMutex::try_new_with_bit_len(
+            init_hmap,
+            4,
+            build_fixed_seed_ahasher_builder(),
+        )
+        .unwrap();
         let test_key = "test_key".to_string();
         let test_value = "test_value".to_string();
 
@@ -223,7 +360,7 @@ mod tests {
     async fn test_partitioned_mutex_async() {
         let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
         let test_locked_map =
-            PartitionedMutexAsync::try_new(init_hmap, 4, 
SeaHasherBuilder).unwrap();
+            PartitionedMutexAsync::try_new_with_bit_len(init_hmap, 4, 
SeaHasherBuilder).unwrap();
         let test_key = "test_key".to_string();
         let test_value = "test_value".to_string();
 
@@ -242,7 +379,8 @@ mod tests {
     fn test_partitioned_mutex_vis_different_partition() {
         let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
         let test_locked_map =
-            PartitionedMutex::try_new(init_vec, 4, 
build_fixed_seed_ahasher_builder()).unwrap();
+            PartitionedMutex::try_new_with_bit_len(init_vec, 4, 
build_fixed_seed_ahasher_builder())
+                .unwrap();
         let mutex_first = test_locked_map.get_partition_by_index(0);
 
         let mut _tmp_data = mutex_first.lock().unwrap();
@@ -256,8 +394,12 @@ mod tests {
     #[test]
     fn test_partitioned_rwmutex_vis_different_partition() {
         let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
-        let test_locked_map =
-            PartitionedRwLock::try_new(init_vec, 4, 
build_fixed_seed_ahasher_builder()).unwrap();
+        let test_locked_map = PartitionedRwLock::try_new_with_bit_len(
+            init_vec,
+            4,
+            build_fixed_seed_ahasher_builder(),
+        )
+        .unwrap();
         let mutex_first = test_locked_map.get_partition_by_index(0);
         let mut _tmp = mutex_first.write().unwrap();
         assert!(mutex_first.try_write().is_err());
@@ -271,7 +413,7 @@ mod tests {
     async fn test_partitioned_mutex_async_vis_different_partition() {
         let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
         let test_locked_map =
-            PartitionedMutexAsync::try_new(init_vec, 4, 
SeaHasherBuilder).unwrap();
+            PartitionedMutexAsync::try_new_with_bit_len(init_vec, 4, 
SeaHasherBuilder).unwrap();
         let mutex_first = test_locked_map.get_partition_by_index(0).await;
 
         let mut _tmp_data = mutex_first.lock().await;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to