This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 14f16c5e4 [#1296] improvement(rust): use std.sync.lock to replace
tokio lock for better performance (#1216)
14f16c5e4 is described below
commit 14f16c5e4c178109e0e182273886aff2a5de3134
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Nov 3 16:53:35 2023 +0800
[#1296] improvement(rust): use std.sync.lock to replace tokio lock for
better performance (#1216)
### What changes were proposed in this pull request?
From the past experience, I found the sync lock performance is better than
the tokio lock,
especially on the critical path.
And the flatten `dashmap` will be faster than the multiple level `dashmap`.
So this PR is to improve above performance point in `app.rs`
### Why are the changes needed?
For better performance
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs
---
rust/experimental/server/src/app.rs | 47 +++++++++++++++---------------------
rust/experimental/server/src/grpc.rs | 31 +++++++++---------------
2 files changed, 31 insertions(+), 47 deletions(-)
diff --git a/rust/experimental/server/src/app.rs
b/rust/experimental/server/src/app.rs
index 9c1151dfc..9b1596845 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -23,7 +23,7 @@ use crate::metric::{
};
use crate::readable_size::ReadableSize;
-
+use crate::runtime::manager::RuntimeManager;
use crate::store::hybrid::HybridStore;
use crate::store::memory::MemorySnapshot;
use crate::store::{
@@ -48,11 +48,9 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
+use std::sync::RwLock;
use std::time::Duration;
-use crate::runtime::manager::RuntimeManager;
-use tokio::sync::RwLock;
-
#[derive(Debug, Clone)]
enum DataDistribution {
NORMAL,
@@ -86,7 +84,8 @@ pub struct App {
app_config_options: Option<AppConfigOptions>,
latest_heartbeat_time: AtomicU64,
store: Arc<HybridStore>,
- bitmap_of_blocks: DashMap<i32, DashMap<i32, PartitionedMeta>>,
+ // key: (shuffle_id, partition_id)
+ bitmap_of_blocks: DashMap<(i32, i32), PartitionedMeta>,
huge_partition_marked_threshold: Option<u64>,
huge_partition_memory_max_available_size: Option<u64>,
}
@@ -111,25 +110,25 @@ impl PartitionedMeta {
}
}
- async fn get_data_size(&self) -> Result<u64> {
- let meta = self.inner.read().await;
+ fn get_data_size(&self) -> Result<u64> {
+ let meta = self.inner.read().unwrap();
Ok(meta.total_size)
}
- async fn incr_data_size(&mut self, data_size: i32) -> Result<()> {
- let mut meta = self.inner.write().await;
+ fn incr_data_size(&mut self, data_size: i32) -> Result<()> {
+ let mut meta = self.inner.write().unwrap();
meta.total_size += data_size as u64;
Ok(())
}
- async fn get_block_ids_bytes(&self) -> Result<Bytes> {
- let meta = self.inner.read().await;
+ fn get_block_ids_bytes(&self) -> Result<Bytes> {
+ let meta = self.inner.read().unwrap();
let serialized_data = meta.blocks_bitmap.serialize()?;
Ok(Bytes::from(serialized_data))
}
- async fn report_block_ids(&mut self, ids: Vec<i64>) -> Result<()> {
- let mut meta = self.inner.write().await;
+ fn report_block_ids(&mut self, ids: Vec<i64>) -> Result<()> {
+ let mut meta = self.inner.write().unwrap();
for id in ids {
meta.blocks_bitmap.add(id as u64);
}
@@ -177,8 +176,7 @@ impl App {
pub async fn insert(&self, ctx: WritingViewContext) -> Result<(),
WorkerError> {
let len: i32 = ctx.data_blocks.iter().map(|block| block.length).sum();
self.get_underlying_partition_bitmap(ctx.uid.clone())
- .incr_data_size(len)
- .await?;
+ .incr_data_size(len)?;
TOTAL_RECEIVED_DATA.inc_by(len as u64);
self.store.insert(ctx).await
@@ -205,7 +203,7 @@ impl App {
let huge_partition_memory = &huge_partition_memory_used.unwrap();
let meta = self.get_underlying_partition_bitmap(uid.clone());
- let data_size = meta.get_data_size().await?;
+ let data_size = meta.get_data_size()?;
if data_size > *huge_partition_size
&& self
.store
@@ -248,27 +246,23 @@ impl App {
fn get_underlying_partition_bitmap(&self, uid: PartitionedUId) ->
PartitionedMeta {
let shuffle_id = uid.shuffle_id;
let partition_id = uid.partition_id;
- let shuffle_entry = self
+ let partitioned_meta = self
.bitmap_of_blocks
- .entry(shuffle_id)
- .or_insert_with(|| DashMap::new());
- let partitioned_meta = shuffle_entry
- .entry(partition_id)
+ .entry((shuffle_id, partition_id))
.or_insert_with(|| PartitionedMeta::new());
-
partitioned_meta.clone()
}
- pub async fn get_block_ids(&self, ctx: GetBlocksContext) -> Result<Bytes> {
+ pub fn get_block_ids(&self, ctx: GetBlocksContext) -> Result<Bytes> {
debug!("get blocks: {:?}", ctx.clone());
let partitioned_meta = self.get_underlying_partition_bitmap(ctx.uid);
- partitioned_meta.get_block_ids_bytes().await
+ partitioned_meta.get_block_ids_bytes()
}
pub async fn report_block_ids(&self, ctx: ReportBlocksContext) ->
Result<()> {
debug!("Report blocks: {:?}", ctx.clone());
let mut partitioned_meta =
self.get_underlying_partition_bitmap(ctx.uid);
- partitioned_meta.report_block_ids(ctx.blocks).await?;
+ partitioned_meta.report_block_ids(ctx.blocks)?;
Ok(())
}
@@ -277,7 +271,7 @@ impl App {
if shuffle_id.is_some() {
error!("Partial purge is not supported.");
} else {
- self.store.purge(app_id).await?;
+ self.store.purge(app_id).await?
}
Ok(())
@@ -688,7 +682,6 @@ mod test {
partition_id: 0,
},
})
- .await
.expect("TODO: panic message");
let deserialized = Treemap::deserialize(&data).unwrap();
diff --git a/rust/experimental/server/src/grpc.rs
b/rust/experimental/server/src/grpc.rs
index c07bef6b6..42f515046 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -428,7 +428,7 @@ impl ShuffleServer for DefaultShuffleServer {
},
blocks: partition_to_block_id.block_ids,
};
- let _ = app.report_block_ids(ctx).await;
+ let _ = app.report_block_ids(ctx);
}
Ok(Response::new(ReportShuffleResultResponse {
@@ -460,16 +460,9 @@ impl ShuffleServer for DefaultShuffleServer {
shuffle_id,
partition_id,
};
- let block_ids_result = app
- .unwrap()
- .get_block_ids(GetBlocksContext {
- uid: partition_id.clone(),
- })
- .instrument_await(format!(
- "getting shuffle blocks ids. uid: {:?}",
- &partition_id
- ))
- .await;
+ let block_ids_result = app.unwrap().get_block_ids(GetBlocksContext {
+ uid: partition_id.clone(),
+ });
if block_ids_result.is_err() {
let err_msg = block_ids_result.err();
@@ -511,15 +504,13 @@ impl ShuffleServer for DefaultShuffleServer {
let mut bytes_mut = BytesMut::new();
for partition_id in req.partitions {
- let block_ids_result = app
- .get_block_ids(GetBlocksContext {
- uid: PartitionedUId {
- app_id: app_id.clone(),
- shuffle_id,
- partition_id,
- },
- })
- .await;
+ let block_ids_result = app.get_block_ids(GetBlocksContext {
+ uid: PartitionedUId {
+ app_id: app_id.clone(),
+ shuffle_id,
+ partition_id,
+ },
+ });
if block_ids_result.is_err() {
let err_msg = block_ids_result.err();
error!(