This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 14f16c5e4 [#1296] improvement(rust): use std.sync.lock to replace 
tokio lock for better performance (#1216)
14f16c5e4 is described below

commit 14f16c5e4c178109e0e182273886aff2a5de3134
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Nov 3 16:53:35 2023 +0800

    [#1296] improvement(rust): use std.sync.lock to replace tokio lock for 
better performance (#1216)
    
    ### What changes were proposed in this pull request?
    
    From the past experience, I found the sync lock performance is better than 
the tokio lock,
     especially on the critical path.
    
    And the flatten `dashmap` will be faster than the multiple level `dashmap`.
    
    So this PR is to improve above performance point in `app.rs`
    
    ### Why are the changes needed?
    
    For better performance
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs
---
 rust/experimental/server/src/app.rs  | 47 +++++++++++++++---------------------
 rust/experimental/server/src/grpc.rs | 31 +++++++++---------------
 2 files changed, 31 insertions(+), 47 deletions(-)

diff --git a/rust/experimental/server/src/app.rs 
b/rust/experimental/server/src/app.rs
index 9c1151dfc..9b1596845 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -23,7 +23,7 @@ use crate::metric::{
 };
 
 use crate::readable_size::ReadableSize;
-
+use crate::runtime::manager::RuntimeManager;
 use crate::store::hybrid::HybridStore;
 use crate::store::memory::MemorySnapshot;
 use crate::store::{
@@ -48,11 +48,9 @@ use std::str::FromStr;
 
 use std::sync::atomic::{AtomicU64, Ordering};
 use std::sync::Arc;
+use std::sync::RwLock;
 use std::time::Duration;
 
-use crate::runtime::manager::RuntimeManager;
-use tokio::sync::RwLock;
-
 #[derive(Debug, Clone)]
 enum DataDistribution {
     NORMAL,
@@ -86,7 +84,8 @@ pub struct App {
     app_config_options: Option<AppConfigOptions>,
     latest_heartbeat_time: AtomicU64,
     store: Arc<HybridStore>,
-    bitmap_of_blocks: DashMap<i32, DashMap<i32, PartitionedMeta>>,
+    // key: (shuffle_id, partition_id)
+    bitmap_of_blocks: DashMap<(i32, i32), PartitionedMeta>,
     huge_partition_marked_threshold: Option<u64>,
     huge_partition_memory_max_available_size: Option<u64>,
 }
@@ -111,25 +110,25 @@ impl PartitionedMeta {
         }
     }
 
-    async fn get_data_size(&self) -> Result<u64> {
-        let meta = self.inner.read().await;
+    fn get_data_size(&self) -> Result<u64> {
+        let meta = self.inner.read().unwrap();
         Ok(meta.total_size)
     }
 
-    async fn incr_data_size(&mut self, data_size: i32) -> Result<()> {
-        let mut meta = self.inner.write().await;
+    fn incr_data_size(&mut self, data_size: i32) -> Result<()> {
+        let mut meta = self.inner.write().unwrap();
         meta.total_size += data_size as u64;
         Ok(())
     }
 
-    async fn get_block_ids_bytes(&self) -> Result<Bytes> {
-        let meta = self.inner.read().await;
+    fn get_block_ids_bytes(&self) -> Result<Bytes> {
+        let meta = self.inner.read().unwrap();
         let serialized_data = meta.blocks_bitmap.serialize()?;
         Ok(Bytes::from(serialized_data))
     }
 
-    async fn report_block_ids(&mut self, ids: Vec<i64>) -> Result<()> {
-        let mut meta = self.inner.write().await;
+    fn report_block_ids(&mut self, ids: Vec<i64>) -> Result<()> {
+        let mut meta = self.inner.write().unwrap();
         for id in ids {
             meta.blocks_bitmap.add(id as u64);
         }
@@ -177,8 +176,7 @@ impl App {
     pub async fn insert(&self, ctx: WritingViewContext) -> Result<(), 
WorkerError> {
         let len: i32 = ctx.data_blocks.iter().map(|block| block.length).sum();
         self.get_underlying_partition_bitmap(ctx.uid.clone())
-            .incr_data_size(len)
-            .await?;
+            .incr_data_size(len)?;
         TOTAL_RECEIVED_DATA.inc_by(len as u64);
 
         self.store.insert(ctx).await
@@ -205,7 +203,7 @@ impl App {
         let huge_partition_memory = &huge_partition_memory_used.unwrap();
 
         let meta = self.get_underlying_partition_bitmap(uid.clone());
-        let data_size = meta.get_data_size().await?;
+        let data_size = meta.get_data_size()?;
         if data_size > *huge_partition_size
             && self
                 .store
@@ -248,27 +246,23 @@ impl App {
     fn get_underlying_partition_bitmap(&self, uid: PartitionedUId) -> 
PartitionedMeta {
         let shuffle_id = uid.shuffle_id;
         let partition_id = uid.partition_id;
-        let shuffle_entry = self
+        let partitioned_meta = self
             .bitmap_of_blocks
-            .entry(shuffle_id)
-            .or_insert_with(|| DashMap::new());
-        let partitioned_meta = shuffle_entry
-            .entry(partition_id)
+            .entry((shuffle_id, partition_id))
             .or_insert_with(|| PartitionedMeta::new());
-
         partitioned_meta.clone()
     }
 
-    pub async fn get_block_ids(&self, ctx: GetBlocksContext) -> Result<Bytes> {
+    pub fn get_block_ids(&self, ctx: GetBlocksContext) -> Result<Bytes> {
         debug!("get blocks: {:?}", ctx.clone());
         let partitioned_meta = self.get_underlying_partition_bitmap(ctx.uid);
-        partitioned_meta.get_block_ids_bytes().await
+        partitioned_meta.get_block_ids_bytes()
     }
 
     pub async fn report_block_ids(&self, ctx: ReportBlocksContext) -> 
Result<()> {
         debug!("Report blocks: {:?}", ctx.clone());
         let mut partitioned_meta = 
self.get_underlying_partition_bitmap(ctx.uid);
-        partitioned_meta.report_block_ids(ctx.blocks).await?;
+        partitioned_meta.report_block_ids(ctx.blocks)?;
 
         Ok(())
     }
@@ -277,7 +271,7 @@ impl App {
         if shuffle_id.is_some() {
             error!("Partial purge is not supported.");
         } else {
-            self.store.purge(app_id).await?;
+            self.store.purge(app_id).await?
         }
 
         Ok(())
@@ -688,7 +682,6 @@ mod test {
                     partition_id: 0,
                 },
             })
-            .await
             .expect("TODO: panic message");
 
         let deserialized = Treemap::deserialize(&data).unwrap();
diff --git a/rust/experimental/server/src/grpc.rs 
b/rust/experimental/server/src/grpc.rs
index c07bef6b6..42f515046 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -428,7 +428,7 @@ impl ShuffleServer for DefaultShuffleServer {
                 },
                 blocks: partition_to_block_id.block_ids,
             };
-            let _ = app.report_block_ids(ctx).await;
+            let _ = app.report_block_ids(ctx);
         }
 
         Ok(Response::new(ReportShuffleResultResponse {
@@ -460,16 +460,9 @@ impl ShuffleServer for DefaultShuffleServer {
             shuffle_id,
             partition_id,
         };
-        let block_ids_result = app
-            .unwrap()
-            .get_block_ids(GetBlocksContext {
-                uid: partition_id.clone(),
-            })
-            .instrument_await(format!(
-                "getting shuffle blocks ids. uid: {:?}",
-                &partition_id
-            ))
-            .await;
+        let block_ids_result = app.unwrap().get_block_ids(GetBlocksContext {
+            uid: partition_id.clone(),
+        });
 
         if block_ids_result.is_err() {
             let err_msg = block_ids_result.err();
@@ -511,15 +504,13 @@ impl ShuffleServer for DefaultShuffleServer {
 
         let mut bytes_mut = BytesMut::new();
         for partition_id in req.partitions {
-            let block_ids_result = app
-                .get_block_ids(GetBlocksContext {
-                    uid: PartitionedUId {
-                        app_id: app_id.clone(),
-                        shuffle_id,
-                        partition_id,
-                    },
-                })
-                .await;
+            let block_ids_result = app.get_block_ids(GetBlocksContext {
+                uid: PartitionedUId {
+                    app_id: app_id.clone(),
+                    shuffle_id,
+                    partition_id,
+                },
+            });
             if block_ids_result.is_err() {
                 let err_msg = block_ids_result.err();
                 error!(

Reply via email to