This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new a98412ec8 [#1407] improvement(rust): Critical bug fix of getting
blockIds and some optimization (#1408)
a98412ec8 is described below
commit a98412ec8858ce8cebc977ab7b7f48366973cc63
Author: Junfan Zhang <[email protected]>
AuthorDate: Sat Dec 30 09:46:45 2023 +0800
[#1407] improvement(rust): Critical bug fix of getting blockIds and some
optimization (#1408)
### What changes were proposed in this pull request?
To improve the rust based server performance and fix some bugs
### Why are the changes needed?
- [x] fix getting blockId failure
- [x] support unregister/finish shuffle grpc interface explicitly
- [x] fix the memory reserve when app is deleted
- [x] fix the buffer ticket reserve when app is deleted.
- [x] add test cases
- [x] fix the release bug when the partial data sent failed, the
allocated should be released
- [x] refactor ticket management
- [x] optimize the lock performance when high concurrency of writing
Fix: #1407
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Detailed commits log
All below commits are all squashed into one commit.
* feat: unify grpc service start to grpc runtime manager
* fix(rust): forget to call report blockIds method await
* feat: support unregister grpc interface
* fix: free unallocated size for one data sending
* feat: introduce the cap to constrain memory allocation
* feat: refactor ticket manager
* chore: update the latest benchmark with 1T terasort
* feat: use spin lock to get better latency
* chore(doc): add netty based server benchmark report
---
rust/experimental/server/Cargo.lock | 26 +-
rust/experimental/server/Cargo.toml | 3 +
rust/experimental/server/README.md | 51 ++--
rust/experimental/server/src/app.rs | 109 ++++---
rust/experimental/server/src/error.rs | 3 +
rust/experimental/server/src/grpc.rs | 76 ++++-
rust/experimental/server/src/lib.rs | 35 ++-
rust/experimental/server/src/main.rs | 17 +-
rust/experimental/server/src/mem_allocator/mod.rs | 6 +-
rust/experimental/server/src/store/hdfs.rs | 11 +-
rust/experimental/server/src/store/hybrid.rs | 51 ++--
rust/experimental/server/src/store/localfile.rs | 82 +++--
rust/experimental/server/src/store/mem/mod.rs | 30 +-
rust/experimental/server/src/store/mem/ticket.rs | 231 ++++++++++++++
rust/experimental/server/src/store/memory.rs | 350 +++++++---------------
rust/experimental/server/src/store/mod.rs | 9 +-
rust/experimental/server/tests/write_read.rs | 4 +
17 files changed, 687 insertions(+), 407 deletions(-)
diff --git a/rust/experimental/server/Cargo.lock
b/rust/experimental/server/Cargo.lock
index ec8ed2930..6c0466225 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -127,6 +127,12 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "awaitility"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "46ee60785cbb3a23bde2c462098564a6337432b9fa032a62bb7ddb5e734c135d"
+
[[package]]
name = "axum"
version = "0.6.20"
@@ -265,6 +271,12 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
+[[package]]
+name = "cap"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6f125eb85b84a24c36b02ed1d22c9dd8632f53b3cde6e4d23512f94021030003"
+
[[package]]
name = "cassowary"
version = "0.3.0"
@@ -2243,7 +2255,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
- "spin",
+ "spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@@ -2539,6 +2551,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
+[[package]]
+name = "spin"
+version = "0.9.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
+dependencies = [
+ "lock_api",
+]
+
[[package]]
name = "sse-codec"
version = "0.3.2"
@@ -3174,7 +3195,9 @@ dependencies = [
"async-channel",
"async-trait",
"await-tree",
+ "awaitility",
"bytes 1.5.0",
+ "cap",
"clap",
"console-subscriber",
"crc32fast",
@@ -3198,6 +3221,7 @@ dependencies = [
"serde",
"signal-hook",
"socket2 0.4.9",
+ "spin 0.9.8",
"tempdir",
"tempfile",
"thiserror",
diff --git a/rust/experimental/server/Cargo.toml
b/rust/experimental/server/Cargo.toml
index 62a784265..8c1f97b39 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -95,6 +95,8 @@ pin-project-lite = "0.2.8"
signal-hook = "0.3.17"
clap = "3.0.14"
socket2 = { version="0.4", features = ["all"]}
+cap = "0.1.2"
+spin = "0.9.8"
[dependencies.hdrs]
version = "0.3.0"
@@ -122,6 +124,7 @@ prost-build = "0.11.9"
[dev-dependencies]
env_logger = "0.10.0"
+awaitility = "0.3.1"
[profile.dev]
# re-enable debug assertions when pprof-rs fixed the reports for misaligned
pointer dereferences
diff --git a/rust/experimental/server/README.md
b/rust/experimental/server/README.md
index 52ef6a3a5..cc8d1ec8d 100644
--- a/rust/experimental/server/README.md
+++ b/rust/experimental/server/README.md
@@ -20,18 +20,20 @@ Another implementation of Apache Uniffle shuffle server
## Benchmark report
#### Environment
-_Software_: Uniffle 0.7.0 / Hadoop 3.2.2 / Spark 3.1.2
+
+_Software_: Uniffle 0.8.0 / Hadoop 3.2.2 / Spark 3.1.2
_Hardware_: Machine 96 cores, 512G memory, 1T * 4 SSD, network bandwidth 8GB/s
_Hadoop Yarn Cluster_: 1 * ResourceManager + 40 * NodeManager, every machine
4T * 4 HDD
-_Uniffle Cluster_: 1 * Coordinator + 5 * Shuffle Server, every machine 1T * 4
SSD
+_Uniffle Cluster_: 1 * Coordinator + 1 * Shuffle Server, every machine 1T * 4
NVME SSD
#### Configuration
+
spark's conf
-```
-spark.executor.instances 100
+```yaml
+spark.executor.instances 400
spark.executor.cores 1
spark.executor.memory 2g
spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
@@ -39,42 +41,51 @@ spark.rss.storage.type MEMORY_LOCALFILE
```
uniffle grpc-based server's conf
-```
-JVM XMX=130g
+``` yaml
+JVM XMX=30g
+
+# JDK11 + G1
-...
-rss.server.buffer.capacity 100g
-rss.server.read.buffer.capacity 20g
+rss.server.buffer.capacity 10g
+rss.server.read.buffer.capacity 10g
rss.server.flush.thread.alive 10
rss.server.flush.threadPool.size 50
rss.server.high.watermark.write 80
rss.server.low.watermark.write 70
-...
```
-uniffle-x(Rust-based)'s conf
+Rust-based shuffle-server conf
```
store_type = "MEMORY_LOCALFILE"
[memory_store]
-capacity = "100G"
+capacity = "10G"
[localfile_store]
data_paths = ["/data1/uniffle", "/data2/uniffle", "/data3/uniffle",
"/data4/uniffle"]
healthy_check_min_disks = 0
[hybrid_store]
-memory_spill_high_watermark = 0.5
-memory_spill_low_watermark = 0.4
+memory_spill_high_watermark = 0.8
+memory_spill_low_watermark = 0.7
```
-#### Tera Sort
-| type | 100G | 1T | 5T
(run with 400 executors) |
-|------------------------------|:----------------:|:---------------------:|:---------------------------:|
-| vanilla uniffle (grpc-based) | 1.4min (29s/53s) | 12min (4.7min/7.0min) |
18.7min(12min/6.7min) |
-| uniffle-x | 1.3min (28s/48s) | 11min (4.3min/6.5min) |
14min(7.8min/6.2min) |
+#### TeraSort cost times
+| type/buffer capacity | 250G (compressed) |
comment |
+|--------------------------------------|:------------------:|:--------------------------------------------------------:|
+| vanilla uniffle (grpc-based) / 10g | 5.3min (2.3m/3m) |
1.9G/s |
+| vanilla uniffle (grpc-based) / 300g | 5.6min (3.7m/1.9m) | GC
occurs frequently / 2.5G/s |
+| vanilla uniffle (netty-based) / 10g | / | read failed.
2.5G/s (write is better due to zero copy) |
+| vanilla uniffle (netty-based) / 300g | / |
app hang |
+| rust based shuffle server / 10g | 4.6min (2.2m/2.4m) |
2.0 G/s |
+| rust based shuffle server / 300g | 4min (1.5m/2.5m) |
3.5 G/s |
+
+
+Compared with grpc based server, rust-based server has less memory footprint
and stable performance.
+
+And Netty is still not stable for production env.
-> Tips: When running 5T on vanilla Uniffle, data sent timeouts may occur, and
there can be occasional failures in client fetching the block bitmap.
+In the future, rust-based server will use io_uring mechanism to improve
writing performance.
## Build
diff --git a/rust/experimental/server/src/app.rs
b/rust/experimental/server/src/app.rs
index d75f8f969..9298fb6e3 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -31,7 +31,7 @@ use crate::store::{
StoreProvider,
};
use crate::util::current_timestamp_sec;
-use anyhow::Result;
+use anyhow::{anyhow, Result};
use bytes::Bytes;
use croaring::treemap::JvmSerializer;
use croaring::Treemap;
@@ -173,13 +173,15 @@ impl App {
Ok(())
}
- pub async fn insert(&self, ctx: WritingViewContext) -> Result<(),
WorkerError> {
+ pub async fn insert(&self, ctx: WritingViewContext) -> Result<i32,
WorkerError> {
let len: i32 = ctx.data_blocks.iter().map(|block| block.length).sum();
self.get_underlying_partition_bitmap(ctx.uid.clone())
.incr_data_size(len)?;
TOTAL_RECEIVED_DATA.inc_by(len as u64);
- self.store.insert(ctx).await
+ self.store.insert(ctx).await?;
+
+ Ok(len)
}
pub async fn select(&self, ctx: ReadingViewContext) ->
Result<ResponseData, WorkerError> {
@@ -231,13 +233,8 @@ impl App {
}
}
- pub fn is_buffer_ticket_exist(&self, ticket_id: i64) -> bool {
- self.store
- .is_buffer_ticket_exist(self.app_id.as_str(), ticket_id)
- }
-
- pub fn discard_tickets(&self, ticket_id: i64) -> i64 {
- self.store.discard_tickets(self.app_id.as_str(), ticket_id)
+ pub async fn free_allocated_memory_size(&self, size: i64) -> Result<bool> {
+ self.store.free_hot_store_allocated_memory_size(size).await
}
pub async fn require_buffer(
@@ -252,6 +249,12 @@ impl App {
self.store.require_buffer(ctx).await
}
+ pub async fn release_buffer(&self, ticket_id: i64) -> Result<i64,
WorkerError> {
+ self.store
+ .release_buffer(ReleaseBufferContext::from(ticket_id))
+ .await
+ }
+
fn get_underlying_partition_bitmap(&self, uid: PartitionedUId) ->
PartitionedMeta {
let shuffle_id = uid.shuffle_id;
let partition_id = uid.partition_id;
@@ -277,13 +280,30 @@ impl App {
}
pub async fn purge(&self, app_id: String, shuffle_id: Option<i32>) ->
Result<()> {
- if shuffle_id.is_some() {
- error!("Partial purge is not supported.");
- } else {
- self.store.purge(app_id).await?
- }
+ self.store
+ .purge(PurgeDataContext::new(app_id, shuffle_id))
+ .await
+ }
+}
- Ok(())
+#[derive(Debug, Clone)]
+pub struct PurgeDataContext {
+ pub(crate) app_id: String,
+ pub(crate) shuffle_id: Option<i32>,
+}
+
+impl PurgeDataContext {
+ pub fn new(app_id: String, shuffle_id: Option<i32>) -> PurgeDataContext {
+ PurgeDataContext { app_id, shuffle_id }
+ }
+}
+
+impl From<&str> for PurgeDataContext {
+ fn from(app_id_ref: &str) -> Self {
+ PurgeDataContext {
+ app_id: app_id_ref.to_string(),
+ shuffle_id: None,
+ }
}
}
@@ -321,6 +341,17 @@ pub struct RequireBufferContext {
pub size: i64,
}
+#[derive(Debug, Clone)]
+pub struct ReleaseBufferContext {
+ pub(crate) ticket_id: i64,
+}
+
+impl From<i64> for ReleaseBufferContext {
+ fn from(value: i64) -> Self {
+ Self { ticket_id: value }
+ }
+}
+
impl RequireBufferContext {
pub fn new(uid: PartitionedUId, size: i64) -> Self {
Self { uid, size }
@@ -343,7 +374,7 @@ pub enum PurgeEvent {
// app_id
HEART_BEAT_TIMEOUT(String),
// app_id + shuffle_id
- APP_PARTIAL_SHUFFLES_PURGE(String, Vec<i32>),
+ APP_PARTIAL_SHUFFLES_PURGE(String, i32),
// app_id
APP_PURGE(String),
}
@@ -424,18 +455,18 @@ impl AppManager {
"The app:[{}]'s data will be purged due to
heartbeat timeout",
&app_id
);
- app_manager_cloned.purge_app_data(app_id).await
+ app_manager_cloned.purge_app_data(app_id, None).await
}
PurgeEvent::APP_PURGE(app_id) => {
info!(
"The app:[{}] has been finished, its data will be
purged.",
&app_id
);
- app_manager_cloned.purge_app_data(app_id).await
+ app_manager_cloned.purge_app_data(app_id, None).await
}
- PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(_app_id,
_shuffle_ids) => {
- info!("Partial data purge is not supported currently");
- Ok(())
+ PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id)
=> {
+ info!("The app:[{:?}] with shuffleId: [{:?}] will be
purged due to unregister grpc interface", &app_id, shuffle_id);
+ app_manager_cloned.purge_app_data(app_id,
Some(shuffle_id)).await
}
}
.map_err(|err| error!("Errors on purging data. error: {:?}",
err));
@@ -457,19 +488,16 @@ impl AppManager {
self.store.memory_spill_event_num()
}
- async fn purge_app_data(&self, app_id: String) -> Result<()> {
- let app = self.get_app(&app_id);
- if app.is_none() {
- error!(
- "App:{} don't exist when purging data, this should not happen",
- &app_id
- );
- } else {
- let app = app.unwrap();
- app.purge(app_id.clone(), None).await?;
- }
+ async fn purge_app_data(&self, app_id: String, shuffle_id_option:
Option<i32>) -> Result<()> {
+ let app = self.get_app(&app_id).ok_or(anyhow!(format!(
+ "App:{} don't exist when purging data, this should not happen",
+ &app_id
+ )))?;
+ app.purge(app_id.clone(), shuffle_id_option).await?;
- self.apps.remove(&app_id);
+ if shuffle_id_option.is_none() {
+ self.apps.remove(&app_id);
+ }
Ok(())
}
@@ -520,13 +548,10 @@ impl AppManager {
app_ref.register_shuffle(shuffle_id)
}
- pub async fn unregister(&self, app_id: String, shuffle_ids:
Option<Vec<i32>>) -> Result<()> {
- let event = match shuffle_ids {
- Some(ids) => PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, ids),
- _ => PurgeEvent::APP_PURGE(app_id),
- };
-
- self.sender.send(event).await?;
+ pub async fn unregister(&self, app_id: String, shuffle_id: i32) ->
Result<()> {
+ self.sender
+ .send(PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id))
+ .await?;
Ok(())
}
}
@@ -649,7 +674,7 @@ mod test {
// case3: purge
app_manager_ref
- .purge_app_data(app_id.to_string())
+ .purge_app_data(app_id.to_string(), None)
.await
.expect("");
diff --git a/rust/experimental/server/src/error.rs
b/rust/experimental/server/src/error.rs
index 0f411af0c..cc07536a5 100644
--- a/rust/experimental/server/src/error.rs
+++ b/rust/experimental/server/src/error.rs
@@ -48,6 +48,9 @@ pub enum WorkerError {
#[error("Http request failed. {0}")]
HTTP_SERVICE_ERROR(String),
+
+ #[error("Ticket id: {0} not exist")]
+ TICKET_ID_NOT_EXIST(i64),
}
impl From<AcquireError> for WorkerError {
diff --git a/rust/experimental/server/src/grpc.rs
b/rust/experimental/server/src/grpc.rs
index 0c9730522..7c784c245 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -103,12 +103,24 @@ impl ShuffleServer for DefaultShuffleServer {
async fn unregister_shuffle(
&self,
- _request: Request<ShuffleUnregisterRequest>,
+ request: Request<ShuffleUnregisterRequest>,
) -> Result<Response<ShuffleUnregisterResponse>, Status> {
- // todo: implement shuffle level deletion
- info!("Accepted unregister shuffle info....");
+ let request = request.into_inner();
+ let shuffle_id = request.shuffle_id;
+ let app_id = request.app_id;
+
+ info!(
+ "Accepted unregister shuffle info for [app:{:?}, shuffle_id:{:?}]",
+ &app_id, shuffle_id
+ );
+ let status_code = self
+ .app_manager_ref
+ .unregister(app_id, shuffle_id)
+ .await
+ .map_or_else(|_e| StatusCode::INTERNAL_ERROR, |_|
StatusCode::SUCCESS);
+
Ok(Response::new(ShuffleUnregisterResponse {
- status: StatusCode::SUCCESS.into(),
+ status: status_code.into(),
ret_msg: "".to_string(),
}))
}
@@ -137,15 +149,16 @@ impl ShuffleServer for DefaultShuffleServer {
let app = app_option.unwrap();
- if !app.is_buffer_ticket_exist(ticket_id) {
+ let release_result = app.release_buffer(ticket_id).await;
+ if release_result.is_err() {
return Ok(Response::new(SendShuffleDataResponse {
status: StatusCode::NO_BUFFER.into(),
ret_msg: "No such buffer ticket id, it may be discarded due to
timeout".to_string(),
}));
- } else {
- app.discard_tickets(ticket_id);
}
+ let ticket_required_size = release_result.unwrap();
+
let mut blocks_map = HashMap::new();
for shuffle_data in req.shuffle_data {
let data: PartitionedData = shuffle_data.into();
@@ -156,7 +169,15 @@ impl ShuffleServer for DefaultShuffleServer {
blocks.extend(partitioned_blocks);
}
+ let mut inserted_failure_occurs = false;
+ let mut inserted_failure_error = None;
+ let mut inserted_total_size = 0;
+
for (partition_id, blocks) in blocks_map.into_iter() {
+ if inserted_failure_occurs {
+ continue;
+ }
+
let uid = PartitionedUId {
app_id: app_id.clone(),
shuffle_id,
@@ -178,13 +199,35 @@ impl ShuffleServer for DefaultShuffleServer {
inserted.err()
);
error!("{}", &err);
- return Ok(Response::new(SendShuffleDataResponse {
- status: StatusCode::INTERNAL_ERROR.into(),
- ret_msg: err,
- }));
+
+ inserted_failure_error = Some(err);
+ inserted_failure_occurs = true;
+ continue;
+ }
+
+ let inserted_size = inserted.unwrap();
+ inserted_total_size += inserted_size as i64;
+ }
+
+ let unused_allocated_size = ticket_required_size - inserted_total_size;
+ if unused_allocated_size != 0 {
+ debug!("The required buffer size:[{:?}] has remaining allocated
size:[{:?}] of unused, this should not happen",
+ ticket_required_size, unused_allocated_size);
+ if let Err(e) =
app.free_allocated_memory_size(unused_allocated_size).await {
+ warn!(
+ "Errors on free allocated size: {:?} for app: {:?}. err:
{:#?}",
+ unused_allocated_size, &app_id, e
+ );
}
}
+ if inserted_failure_occurs {
+ return Ok(Response::new(SendShuffleDataResponse {
+ status: StatusCode::INTERNAL_ERROR.into(),
+ ret_msg: inserted_failure_error.unwrap(),
+ }));
+ }
+
timer.observe_duration();
Ok(Response::new(SendShuffleDataResponse {
@@ -446,7 +489,16 @@ impl ShuffleServer for DefaultShuffleServer {
},
blocks: partition_to_block_id.block_ids,
};
- let _ = app.report_block_ids(ctx);
+
+ match app.report_block_ids(ctx).await {
+ Err(e) => {
+ return Ok(Response::new(ReportShuffleResultResponse {
+ status: StatusCode::INTERNAL_ERROR.into(),
+ ret_msg: e.to_string(),
+ }))
+ }
+ _ => (),
+ }
}
Ok(Response::new(ReportShuffleResultResponse {
diff --git a/rust/experimental/server/src/lib.rs
b/rust/experimental/server/src/lib.rs
index 4cf340392..1219285b1 100644
--- a/rust/experimental/server/src/lib.rs
+++ b/rust/experimental/server/src/lib.rs
@@ -40,13 +40,15 @@ use
crate::proto::uniffle::shuffle_server_client::ShuffleServerClient;
use crate::proto::uniffle::shuffle_server_server::ShuffleServerServer;
use crate::proto::uniffle::{
GetLocalShuffleDataRequest, GetLocalShuffleIndexRequest,
GetMemoryShuffleDataRequest,
- RequireBufferRequest, SendShuffleDataRequest, ShuffleBlock, ShuffleData,
- ShuffleRegisterRequest,
+ GetShuffleResultRequest, PartitionToBlockIds, ReportShuffleResultRequest,
RequireBufferRequest,
+ SendShuffleDataRequest, ShuffleBlock, ShuffleData, ShuffleRegisterRequest,
};
use crate::runtime::manager::RuntimeManager;
use crate::util::gen_worker_uid;
use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
+use croaring::treemap::JvmSerializer;
+use croaring::Treemap;
use log::info;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
@@ -164,6 +166,20 @@ pub async fn write_read_for_one_time(mut client:
ShuffleServerClient<Channel>) -
let response = response.into_inner();
assert_eq!(0, response.status);
+
+ // report the finished block ids
+ client
+ .report_shuffle_result(ReportShuffleResultRequest {
+ app_id: app_id.clone(),
+ shuffle_id: 0,
+ task_attempt_id: 0,
+ bitmap_num: 0,
+ partition_to_block_ids: vec![PartitionToBlockIds {
+ partition_id: idx,
+ block_ids: vec![idx as i64],
+ }],
+ })
+ .await?;
}
tokio::time::sleep(Duration::from_secs(1)).await;
@@ -173,6 +189,21 @@ pub async fn write_read_for_one_time(mut client:
ShuffleServerClient<Channel>) -
// firstly. read from the memory
for idx in 0..batch_size {
+ let block_id_result = client
+ .get_shuffle_result(GetShuffleResultRequest {
+ app_id: app_id.clone(),
+ shuffle_id: 0,
+ partition_id: idx,
+ })
+ .await?
+ .into_inner();
+
+ assert_eq!(0, block_id_result.status);
+
+ let block_id_bitmap =
Treemap::deserialize(&*block_id_result.serialized_bitmap)?;
+ assert_eq!(1, block_id_bitmap.iter().count());
+ assert!(block_id_bitmap.contains(idx as u64));
+
let response_data = client
.get_memory_shuffle_data(GetMemoryShuffleDataRequest {
app_id: app_id.clone(),
diff --git a/rust/experimental/server/src/main.rs
b/rust/experimental/server/src/main.rs
index f8cc817c4..d2fafa4d4 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -32,10 +32,13 @@ use crate::runtime::manager::RuntimeManager;
use crate::signal::details::graceful_wait_for_signal;
use crate::util::{gen_worker_uid, get_local_ip};
+use crate::mem_allocator::ALLOCATOR;
+use crate::readable_size::ReadableSize;
use anyhow::Result;
use clap::{App, Arg};
use log::{debug, error, info};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::str::FromStr;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
@@ -60,6 +63,7 @@ pub mod signal;
pub mod store;
mod util;
+const MAX_MEMORY_ALLOCATION_SIZE_ENV_KEY: &str = "MAX_MEMORY_ALLOCATION_SIZE";
const DEFAULT_SHUFFLE_SERVER_TAG: &str = "ss_v4";
fn start_coordinator_report(
@@ -170,6 +174,8 @@ fn init_log(log: &LogConfig) -> WorkerGuard {
}
fn main() -> Result<()> {
+ setup_max_memory_allocation();
+
let args_match = App::new("Uniffle Worker")
.version("0.9.0-SNAPSHOT")
.about("Rust based shuffle server for Apache Uniffle")
@@ -232,7 +238,7 @@ fn main() -> Result<()> {
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
let service_tx = tx.subscribe();
- std::thread::spawn(move || {
+ runtime_manager.grpc_runtime.spawn_blocking(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
@@ -246,6 +252,13 @@ fn main() -> Result<()> {
Ok(())
}
+fn setup_max_memory_allocation() {
+ let _ = std::env::var(MAX_MEMORY_ALLOCATION_SIZE_ENV_KEY).map(|v| {
+ let readable_size = ReadableSize::from_str(v.as_str()).unwrap();
+ ALLOCATOR.set_limit(readable_size.as_bytes() as usize)
+ });
+}
+
async fn grpc_serve(
service: ShuffleServerServer<DefaultShuffleServer>,
addr: SocketAddr,
@@ -283,7 +296,7 @@ async fn grpc_serve(
if let Err(err) = rx.recv().await {
error!("Errors on stopping the GRPC service, err: {:?}.", err);
} else {
- info!("GRPC service has been graceful stopped.");
+ debug!("GRPC service has been graceful stopped.");
}
})
.await
diff --git a/rust/experimental/server/src/mem_allocator/mod.rs
b/rust/experimental/server/src/mem_allocator/mod.rs
index 6751643ee..d075f4ea6 100644
--- a/rust/experimental/server/src/mem_allocator/mod.rs
+++ b/rust/experimental/server/src/mem_allocator/mod.rs
@@ -24,9 +24,10 @@ mod imp;
#[path = "system_std.rs"]
mod imp;
-// set default allocator
+use cap::Cap;
+
#[global_allocator]
-static ALLOC: imp::Allocator = imp::allocator();
+pub static ALLOCATOR: Cap<imp::Allocator> = Cap::new(imp::allocator(),
usize::max_value());
pub mod error;
pub type AllocStats = Vec<(&'static str, usize)>;
@@ -34,6 +35,7 @@ pub type AllocStats = Vec<(&'static str, usize)>;
// when memory-prof feature is enabled, provide empty profiling functions
#[cfg(not(all(unix, feature = "memory-prof")))]
mod default;
+
#[cfg(not(all(unix, feature = "memory-prof")))]
pub use default::*;
diff --git a/rust/experimental/server/src/store/hdfs.rs
b/rust/experimental/server/src/store/hdfs.rs
index f783b379e..c36af4732 100644
--- a/rust/experimental/server/src/store/hdfs.rs
+++ b/rust/experimental/server/src/store/hdfs.rs
@@ -16,8 +16,8 @@
// under the License.
use crate::app::{
- PartitionedUId, ReadingIndexViewContext, ReadingViewContext,
RequireBufferContext,
- WritingViewContext,
+ PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingViewContext,
+ ReleaseBufferContext, RequireBufferContext, WritingViewContext,
};
use crate::config::HdfsStoreConfig;
use crate::error::WorkerError;
@@ -246,7 +246,12 @@ impl Store for HdfsStore {
todo!()
}
- async fn purge(&self, app_id: String) -> Result<()> {
+ async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64,
WorkerError> {
+ todo!()
+ }
+
+ async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+ let app_id = ctx.app_id;
let app_dir = self.get_app_dir(app_id.as_str());
let keys_to_delete: Vec<_> = self
diff --git a/rust/experimental/server/src/store/hybrid.rs
b/rust/experimental/server/src/store/hybrid.rs
index d81314212..32be53511 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -16,8 +16,8 @@
// under the License.
use crate::app::{
- PartitionedUId, ReadingIndexViewContext, ReadingOptions,
ReadingViewContext,
- RequireBufferContext, WritingViewContext,
+ PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingOptions,
ReadingViewContext,
+ ReleaseBufferContext, RequireBufferContext, WritingViewContext,
};
use crate::await_tree::AWAIT_TREE_REGISTRY;
@@ -47,11 +47,12 @@ use std::any::Any;
use std::collections::VecDeque;
use await_tree::InstrumentAwait;
+use spin::mutex::Mutex;
use std::str::FromStr;
use std::sync::Arc;
use crate::runtime::manager::RuntimeManager;
-use tokio::sync::{Mutex, Semaphore};
+use tokio::sync::Semaphore;
trait PersistentStore: Store + Persistent + Send + Sync {}
impl PersistentStore for LocalFileStore {}
@@ -246,7 +247,7 @@ impl HybridStore {
self.hot_store
.release_in_flight_blocks_in_underlying_staging_buffer(uid,
in_flight_blocks_id)
.await?;
- self.hot_store.free_memory(spill_size).await?;
+ self.hot_store.free_used(spill_size).await?;
match self.get_store_type(candidate_store) {
StorageType::LOCALFILE => {
@@ -261,13 +262,8 @@ impl HybridStore {
Ok(message)
}
- // For app to check the ticket allocated existence and discard if it has
been used
- pub fn is_buffer_ticket_exist(&self, app_id: &str, ticket_id: i64) -> bool
{
- self.hot_store.is_ticket_exist(app_id, ticket_id)
- }
-
- pub fn discard_tickets(&self, app_id: &str, ticket_id: i64) -> i64 {
- self.hot_store.discard_tickets(app_id, Some(ticket_id))
+ pub async fn free_hot_store_allocated_memory_size(&self, size: i64) ->
Result<bool> {
+ self.hot_store.free_allocated(size).await
}
pub async fn get_hot_store_memory_snapshot(&self) ->
Result<MemorySnapshot> {
@@ -404,7 +400,7 @@ impl Store for HybridStore {
let buffer = self
.hot_store
.get_or_create_underlying_staging_buffer(uid.clone());
- let mut buffer_inner = buffer.lock().await;
+ let mut buffer_inner = buffer.lock();
if size.as_bytes() < buffer_inner.get_staging_size()? as u64 {
let (in_flight_uid, blocks) =
buffer_inner.migrate_staging_to_in_flight()?;
self.make_memory_buffer_flush(in_flight_uid, blocks,
uid.clone())
@@ -415,7 +411,7 @@ impl Store for HybridStore {
// if the used size exceed the ratio of high watermark,
// then send watermark flush trigger
- if let Ok(_lock) = self.memory_spill_lock.try_lock() {
+ if let Some(_lock) = self.memory_spill_lock.try_lock() {
let used_ratio = self.hot_store.memory_used_ratio().await;
if used_ratio > self.config.memory_spill_high_watermark {
if let Err(e) =
self.memory_watermark_flush_trigger_sender.send(()).await {
@@ -457,24 +453,21 @@ impl Store for HybridStore {
.await
}
- async fn purge(&self, app_id: String) -> Result<()> {
- self.hot_store.purge(app_id.clone()).await?;
- info!("Removed data of app:[{}] in hot store", &app_id);
+ async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result<i64,
WorkerError> {
+ self.hot_store.release_buffer(ctx).await
+ }
+
+ async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+ let app_id = &ctx.app_id;
+ self.hot_store.purge(ctx.clone()).await?;
+ info!("Removed data of app:[{}] in hot store", app_id);
if self.warm_store.is_some() {
- self.warm_store
- .as_ref()
- .unwrap()
- .purge(app_id.clone())
- .await?;
- info!("Removed data of app:[{}] in warm store", &app_id);
+ self.warm_store.as_ref().unwrap().purge(ctx.clone()).await?;
+ info!("Removed data of app:[{}] in warm store", app_id);
}
if self.cold_store.is_some() {
- self.cold_store
- .as_ref()
- .unwrap()
- .purge(app_id.clone())
- .await?;
- info!("Removed data of app:[{}] in cold store", &app_id);
+ self.cold_store.as_ref().unwrap().purge(ctx.clone()).await?;
+ info!("Removed data of app:[{}] in cold store", app_id);
}
Ok(())
}
@@ -511,7 +504,7 @@ pub async fn watermark_flush(store: Arc<HybridStore>) ->
Result<()> {
let mut flushed_size = 0u64;
for (partition_id, buffer) in buffers {
- let mut buffer_inner = buffer.lock().await;
+ let mut buffer_inner = buffer.lock();
let (in_flight_uid, blocks) =
buffer_inner.migrate_staging_to_in_flight()?;
drop(buffer_inner);
for block in &blocks {
diff --git a/rust/experimental/server/src/store/localfile.rs
b/rust/experimental/server/src/store/localfile.rs
index 34bd9d97b..e527d8cdb 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -17,8 +17,8 @@
use crate::app::ReadingOptions::FILE_OFFSET_AND_LEN;
use crate::app::{
- PartitionedUId, ReadingIndexViewContext, ReadingViewContext,
RequireBufferContext,
- WritingViewContext,
+ PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingViewContext,
+ ReleaseBufferContext, RequireBufferContext, WritingViewContext,
};
use crate::config::LocalfileStoreConfig;
use crate::error::WorkerError;
@@ -112,6 +112,10 @@ impl LocalFileStore {
format!("{}", app_id)
}
+ fn gen_relative_path_for_shuffle(app_id: &str, shuffle_id: i32) -> String {
+ format!("{}/{}", app_id, shuffle_id)
+ }
+
fn gen_relative_path_for_partition(uid: &PartitionedUId) -> (String,
String) {
(
format!(
@@ -426,32 +430,44 @@ impl Store for LocalFileStore {
todo!()
}
- async fn purge(&self, app_id: String) -> Result<()> {
- let app_relative_dir_path =
LocalFileStore::gen_relative_path_for_app(&app_id);
+ async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64,
WorkerError> {
+ todo!()
+ }
- let all_partition_ids = self.get_app_all_partitions(&app_id);
- if all_partition_ids.is_empty() {
- return Ok(());
- }
+ async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+ let app_id = ctx.app_id;
+ let shuffle_id_option = ctx.shuffle_id;
+
+ let data_relative_dir_path = match shuffle_id_option {
+ Some(shuffle_id) =>
LocalFileStore::gen_relative_path_for_shuffle(&app_id, shuffle_id),
+ _ => LocalFileStore::gen_relative_path_for_app(&app_id),
+ };
for local_disk_ref in &self.local_disks {
let disk = local_disk_ref.clone();
- disk.delete(app_relative_dir_path.to_string()).await?;
+ disk.delete(data_relative_dir_path.to_string()).await?;
}
- for (shuffle_id, partition_id) in all_partition_ids.into_iter() {
- // delete lock
- let uid = PartitionedUId {
- app_id: app_id.clone(),
- shuffle_id,
- partition_id,
- };
- let (data_file_path, _) =
LocalFileStore::gen_relative_path_for_partition(&uid);
- self.partition_file_locks.remove(&data_file_path);
- }
+ if shuffle_id_option.is_none() {
+ let all_partition_ids = self.get_app_all_partitions(&app_id);
+ if all_partition_ids.is_empty() {
+ return Ok(());
+ }
- // delete disk mapping
- self.delete_app(&app_id)?;
+ for (shuffle_id, partition_id) in all_partition_ids.into_iter() {
+ // delete lock
+ let uid = PartitionedUId {
+ app_id: app_id.clone(),
+ shuffle_id,
+ partition_id,
+ };
+ let (data_file_path, _) =
LocalFileStore::gen_relative_path_for_partition(&uid);
+ self.partition_file_locks.remove(&data_file_path);
+ }
+
+ // delete disk mapping
+ self.delete_app(&app_id)?;
+ }
Ok(())
}
@@ -739,8 +755,8 @@ impl LocalDisk {
#[cfg(test)]
mod test {
use crate::app::{
- PartitionedUId, ReadingIndexViewContext, ReadingOptions,
ReadingViewContext,
- WritingViewContext,
+ PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingOptions,
+ ReadingViewContext, WritingViewContext,
};
use crate::store::localfile::{LocalDisk, LocalDiskConfig, LocalFileStore};
@@ -805,11 +821,29 @@ mod test {
&temp_path, &app_id, "0", "0"
)))?
);
- runtime.wait(local_store.purge(app_id.clone()))?;
+
+ // shuffle level purge
+ runtime
+ .wait(local_store.purge(PurgeDataContext::new(app_id.to_string(),
Some(0))))
+ .expect("");
+ assert_eq!(
+ false,
+ runtime.wait(tokio::fs::try_exists(format!(
+ "{}/{}/{}",
+ &temp_path, &app_id, 0
+ )))?
+ );
+
+ // app level purge
+ runtime.wait(local_store.purge((&*app_id).into()))?;
assert_eq!(
false,
runtime.wait(tokio::fs::try_exists(format!("{}/{}", &temp_path,
&app_id)))?
);
+ assert!(!local_store
+ .partition_file_locks
+ .contains_key(&format!("{}/{}/{}/{}.data", &temp_path, &app_id, 0,
0)));
+ assert!(!local_store.partition_written_disk_map.contains_key(&app_id));
Ok(())
}
diff --git a/rust/experimental/server/src/store/mem/mod.rs
b/rust/experimental/server/src/store/mem/mod.rs
index cae795981..84061067d 100644
--- a/rust/experimental/server/src/store/mem/mod.rs
+++ b/rust/experimental/server/src/store/mem/mod.rs
@@ -15,32 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-pub use await_tree::InstrumentAwait;
-
-pub struct MemoryBufferTicket {
- id: i64,
- created_time: u64,
- size: i64,
-}
-
-impl MemoryBufferTicket {
- pub fn new(id: i64, created_time: u64, size: i64) -> Self {
- Self {
- id,
- created_time,
- size,
- }
- }
+pub mod ticket;
- pub fn get_size(&self) -> i64 {
- self.size
- }
-
- pub fn is_timeout(&self, timeout_sec: i64) -> bool {
- crate::util::current_timestamp_sec() - self.created_time > timeout_sec
as u64
- }
-
- pub fn get_id(&self) -> i64 {
- self.id
- }
-}
+pub use await_tree::InstrumentAwait;
diff --git a/rust/experimental/server/src/store/mem/ticket.rs
b/rust/experimental/server/src/store/mem/ticket.rs
new file mode 100644
index 000000000..eaad3796e
--- /dev/null
+++ b/rust/experimental/server/src/store/mem/ticket.rs
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::WorkerError;
+use crate::runtime::manager::RuntimeManager;
+use anyhow::Result;
+use dashmap::DashMap;
+use log::debug;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+
+#[derive(Clone)]
+pub struct Ticket {
+ id: i64,
+ created_time: u64,
+ size: i64,
+ owned_by_app_id: String,
+}
+
+impl Ticket {
+ pub fn new(ticket_id: i64, created_time: u64, size: i64, app_id: &str) ->
Self {
+ Self {
+ id: ticket_id,
+ created_time,
+ size,
+ owned_by_app_id: app_id.into(),
+ }
+ }
+
+ pub fn get_size(&self) -> i64 {
+ self.size
+ }
+
+ pub fn is_timeout(&self, timeout_sec: i64) -> bool {
+ crate::util::current_timestamp_sec() - self.created_time > timeout_sec
as u64
+ }
+
+ pub fn get_id(&self) -> i64 {
+ self.id
+ }
+}
+
+#[derive(Clone)]
+pub struct TicketManager {
+ // key: ticket_id
+ ticket_store: Arc<DashMap<i64, Ticket>>,
+
+ ticket_timeout_sec: i64,
+ ticket_timeout_check_interval_sec: i64,
+}
+
+impl TicketManager {
+ pub fn new<F: FnMut(i64) -> bool + Send + 'static>(
+ ticket_timeout_sec: i64,
+ ticket_timeout_check_interval_sec: i64,
+ free_allocated_size_func: F,
+ runtime_manager: RuntimeManager,
+ ) -> Self {
+ let manager = Self {
+ ticket_store: Default::default(),
+ ticket_timeout_sec,
+ ticket_timeout_check_interval_sec,
+ };
+ Self::schedule_ticket_check(manager.clone(), free_allocated_size_func,
runtime_manager);
+ manager
+ }
+
+ /// check the ticket existence
+ pub fn exist(&self, ticket_id: i64) -> bool {
+ self.ticket_store.contains_key(&ticket_id)
+ }
+
+ /// Delete one ticket by its id, and it will return the allocated size for
this ticket
+ pub fn delete(&self, ticket_id: i64) -> Result<i64, WorkerError> {
+ if let Some(entry) = self.ticket_store.remove(&ticket_id) {
+ Ok(entry.1.size)
+ } else {
+ Err(WorkerError::TICKET_ID_NOT_EXIST(ticket_id))
+ }
+ }
+
+ /// Delete all the ticket owned by the app id. And
+ /// it will return all the allocated size of ticket ids that owned by this
app_id
+ pub fn delete_by_app_id(&self, app_id: &str) -> i64 {
+ let read_view = self.ticket_store.clone();
+ let mut deleted_ids = vec![];
+ for ticket in read_view.iter() {
+ if ticket.owned_by_app_id == *app_id {
+ deleted_ids.push(ticket.id);
+ }
+ }
+
+ let mut size = 0i64;
+ for deleted_id in deleted_ids {
+ size += self
+ .ticket_store
+ .remove(&deleted_id)
+ .map_or(0, |val| val.1.size);
+ }
+ size
+ }
+
+ /// insert one ticket managed by this ticket manager
+ pub fn insert(&self, ticket_id: i64, size: i64, created_timestamp: u64,
app_id: &str) -> bool {
+ let ticket = Ticket {
+ id: ticket_id,
+ created_time: created_timestamp,
+ size,
+ owned_by_app_id: app_id.into(),
+ };
+
+ self.ticket_store
+ .insert(ticket_id, ticket)
+ .map_or(false, |_| true)
+ }
+
+ fn schedule_ticket_check<F: FnMut(i64) -> bool + Send + 'static>(
+ ticket_manager: TicketManager,
+ mut free_allocated_fn: F,
+ _runtime_manager: RuntimeManager,
+ ) {
+ thread::spawn(move || {
+ let ticket_store = ticket_manager.ticket_store;
+ loop {
+ let read_view = ticket_store.clone();
+ let mut timeout_tickets = vec![];
+ for ticket in read_view.iter() {
+ if ticket.is_timeout(ticket_manager.ticket_timeout_sec) {
+ timeout_tickets.push(ticket.id);
+ }
+ }
+
+ let mut total_removed_size = 0i64;
+ for timeout_ticket_id in timeout_tickets.iter() {
+ total_removed_size += ticket_store
+ .remove(timeout_ticket_id)
+ .map_or(0, |val| val.1.size);
+ }
+ if total_removed_size != 0 {
+ free_allocated_fn(total_removed_size);
+ debug!("remove {:#?} memory allocated tickets, release
pre-allocated memory size: {:?}", timeout_tickets, total_removed_size);
+ }
+ thread::sleep(Duration::from_secs(
+ ticket_manager.ticket_timeout_check_interval_sec as u64,
+ ));
+ }
+ });
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::runtime::manager::RuntimeManager;
+ use crate::store::mem::ticket::TicketManager;
+ use dashmap::DashMap;
+ use std::sync::{Arc, Mutex};
+ use std::thread;
+ use std::thread::JoinHandle;
+ use std::time::Duration;
+
+ #[test]
+ fn test_closure() {
+ let state = Arc::new(DashMap::new());
+ state.insert(1, 1);
+
+ fn schedule(mut callback: impl FnMut(i64) -> i64 + Send + 'static) ->
JoinHandle<i64> {
+ thread::spawn(move || callback(2))
+ }
+
+ let state_cloned = state.clone();
+ let callback = move |a: i64| {
+ state_cloned.insert(a, a);
+ a + 1
+ };
+ schedule(callback).join().expect("");
+
+ assert!(state.contains_key(&2));
+ }
+
+ #[test]
+ fn test_ticket_manager() {
+ let released_size = Arc::new(Mutex::new(0));
+
+ let release_size_cloned = released_size.clone();
+ let free_allocated_size_func = move |size: i64| {
+ *(release_size_cloned.lock().unwrap()) += size;
+ true
+ };
+ let ticket_manager =
+ TicketManager::new(1, 1, free_allocated_size_func,
RuntimeManager::default());
+ let app_id = "test_ticket_manager_app_id";
+
+ assert!(ticket_manager.delete(1000).is_err());
+
+ // case1
+ ticket_manager.insert(1, 10, crate::util::current_timestamp_sec() + 1,
app_id);
+ ticket_manager.insert(2, 10, crate::util::current_timestamp_sec() + 1,
app_id);
+ assert!(ticket_manager.exist(1));
+ assert!(ticket_manager.exist(2));
+
+ // case2
+ ticket_manager.delete(1).expect("");
+ assert!(!ticket_manager.exist(1));
+ assert!(ticket_manager.exist(2));
+
+ // case3
+ ticket_manager.delete_by_app_id(app_id);
+ assert!(!ticket_manager.exist(2));
+
+ // case4
+ ticket_manager.insert(3, 10, crate::util::current_timestamp_sec() + 1,
app_id);
+ assert!(ticket_manager.exist(3));
+ awaitility::at_most(Duration::from_secs(5)).until(||
!ticket_manager.exist(3));
+ assert_eq!(10, *released_size.lock().unwrap());
+ }
+}
diff --git a/rust/experimental/server/src/store/memory.rs
b/rust/experimental/server/src/store/memory.rs
index c7b0de94d..cfb6ff0e0 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -17,8 +17,8 @@
use crate::app::ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE;
use crate::app::{
- PartitionedUId, ReadingIndexViewContext, ReadingViewContext,
RequireBufferContext,
- WritingViewContext,
+ PartitionedUId, PurgeDataContext, ReadingIndexViewContext,
ReadingViewContext,
+ ReleaseBufferContext, RequireBufferContext, WritingViewContext,
};
use crate::config::MemoryStoreConfig;
use crate::error::WorkerError;
@@ -39,27 +39,21 @@ use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
-use crate::store::mem::InstrumentAwait;
-use crate::store::mem::MemoryBufferTicket;
+use crate::store::mem::ticket::TicketManager;
use croaring::Treemap;
-use log::error;
+use spin::mutex::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
-use std::time::Duration;
-use tokio::sync::Mutex;
-use tokio::time::sleep as delay_for;
pub struct MemoryStore {
// todo: change to RW lock
state: DashMap<PartitionedUId, Arc<Mutex<StagingBuffer>>>,
budget: MemoryBudget,
// key: app_id, value: allocated memory size
- memory_allocated_of_app: DashMap<String, DashMap<i64, MemoryBufferTicket>>,
memory_capacity: i64,
- buffer_ticket_timeout_sec: i64,
- buffer_ticket_check_interval_sec: i64,
in_flush_buffer_size: AtomicU64,
runtime_manager: RuntimeManager,
+ ticket_manager: TicketManager,
}
unsafe impl Send for MemoryStore {}
@@ -68,45 +62,59 @@ unsafe impl Sync for MemoryStore {}
impl MemoryStore {
// only for test cases
pub fn new(max_memory_size: i64) -> Self {
+ let budget = MemoryBudget::new(max_memory_size);
+ let runtime_manager: RuntimeManager = Default::default();
+
+ let budget_clone = budget.clone();
+ let free_allocated_size_func =
+ move |size: i64| budget_clone.free_allocated(size).map_or(false,
|v| v);
+ let ticket_manager = TicketManager::new(
+ 5 * 60,
+ 10,
+ free_allocated_size_func,
+ runtime_manager.clone(),
+ );
MemoryStore {
+ budget,
state: DashMap::new(),
- budget: MemoryBudget::new(max_memory_size),
- memory_allocated_of_app: DashMap::new(),
memory_capacity: max_memory_size,
- buffer_ticket_timeout_sec: 5 * 60,
- buffer_ticket_check_interval_sec: 10,
+ ticket_manager,
in_flush_buffer_size: Default::default(),
- runtime_manager: Default::default(),
+ runtime_manager,
}
}
pub fn from(conf: MemoryStoreConfig, runtime_manager: RuntimeManager) ->
Self {
let capacity = ReadableSize::from_str(&conf.capacity).unwrap();
+ let budget = MemoryBudget::new(capacity.as_bytes() as i64);
+
+ let budget_clone = budget.clone();
+ let free_allocated_size_func =
+ move |size: i64| budget_clone.free_allocated(size).map_or(false,
|v| v);
+ let ticket_manager = TicketManager::new(
+ 5 * 60,
+ 10,
+ free_allocated_size_func,
+ runtime_manager.clone(),
+ );
MemoryStore {
state: DashMap::new(),
budget: MemoryBudget::new(capacity.as_bytes() as i64),
- memory_allocated_of_app: DashMap::new(),
memory_capacity: capacity.as_bytes() as i64,
- buffer_ticket_timeout_sec:
conf.buffer_ticket_timeout_sec.unwrap_or(5 * 60),
- buffer_ticket_check_interval_sec: 10,
+ ticket_manager,
in_flush_buffer_size: Default::default(),
runtime_manager,
}
}
- // only for tests
- fn refresh_buffer_ticket_check_interval_sec(&mut self, interval: i64) {
- self.buffer_ticket_check_interval_sec = interval
- }
-
// todo: make this used size as a var
pub async fn memory_usage_ratio(&self) -> f32 {
- let snapshot = self.budget.snapshot().await;
+ let snapshot = self.budget.snapshot();
snapshot.get_used_percent()
}
pub async fn memory_snapshot(&self) -> Result<MemorySnapshot> {
- Ok(self.budget.snapshot().await)
+ Ok(self.budget.snapshot())
}
pub fn get_capacity(&self) -> Result<i64> {
@@ -114,7 +122,7 @@ impl MemoryStore {
}
pub async fn memory_used_ratio(&self) -> f32 {
- let snapshot = self.budget.snapshot().await;
+ let snapshot = self.budget.snapshot();
(snapshot.used + snapshot.allocated
- self.in_flush_buffer_size.load(Ordering::SeqCst) as i64) as f32
/ snapshot.capacity as f32
@@ -128,8 +136,12 @@ impl MemoryStore {
self.in_flush_buffer_size.fetch_sub(size, Ordering::SeqCst);
}
- pub async fn free_memory(&self, size: i64) -> Result<bool> {
- self.budget.free_used(size).await
+ pub async fn free_used(&self, size: i64) -> Result<bool> {
+ self.budget.free_used(size)
+ }
+
+ pub async fn free_allocated(&self, size: i64) -> Result<bool> {
+ self.budget.free_allocated(size)
}
pub async fn get_required_spill_buffer(
@@ -139,7 +151,7 @@ impl MemoryStore {
// sort
// get the spill buffers
- let snapshot = self.budget.snapshot().await;
+ let snapshot = self.budget.snapshot();
let removed_size = snapshot.used - target_len;
if removed_size <= 0 {
return HashMap::new();
@@ -149,7 +161,7 @@ impl MemoryStore {
let buffers = self.state.clone().into_read_only();
for buffer in buffers.iter() {
- let staging_size = buffer.1.lock().await.staging_size;
+ let staging_size = buffer.1.lock().staging_size;
let valset = sorted_tree_map
.entry(staging_size)
.or_insert_with(|| vec![]);
@@ -180,7 +192,7 @@ impl MemoryStore {
pub async fn get_partitioned_buffer_size(&self, uid: &PartitionedUId) ->
Result<u64> {
let buffer = self.get_underlying_partition_buffer(uid);
- let buffer = buffer.lock().await;
+ let buffer = buffer.lock();
Ok(buffer.total_size as u64)
}
@@ -194,7 +206,7 @@ impl MemoryStore {
in_flight_blocks_id: i64,
) -> Result<()> {
let buffer = self.get_or_create_underlying_staging_buffer(uid);
- let mut buffer_ref = buffer.lock().await;
+ let mut buffer_ref = buffer.lock();
buffer_ref.flight_finished(&in_flight_blocks_id)?;
Ok(())
}
@@ -234,108 +246,24 @@ impl MemoryStore {
(fetched, fetched_size)
}
-
- pub(crate) fn is_ticket_exist(&self, app_id: &str, ticket_id: i64) -> bool
{
- self.memory_allocated_of_app
- .get(app_id)
- .map_or(false, |app_entry| app_entry.contains_key(&ticket_id))
- }
-
- /// return the discarded memory allocated size
- pub(crate) fn discard_tickets(&self, app_id: &str, ticket_id_option:
Option<i64>) -> i64 {
- match ticket_id_option {
- None => self
- .memory_allocated_of_app
- .remove(app_id)
- .map_or(0, |app| app.1.iter().map(|x| x.get_size()).sum()),
- Some(ticket_id) =>
self.memory_allocated_of_app.get(app_id).map_or(0, |app| {
- app.remove(&ticket_id)
- .map_or(0, |ticket| ticket.1.get_size())
- }),
- }
- }
-
- fn cache_buffer_required_ticket(
- &self,
- app_id: &str,
- require_buffer: &RequireBufferResponse,
- size: i64,
- ) {
- let app_entry = self
- .memory_allocated_of_app
- .entry(app_id.to_string())
- .or_insert_with(|| DashMap::default());
- app_entry.insert(
- require_buffer.ticket_id,
- MemoryBufferTicket::new(
- require_buffer.ticket_id,
- require_buffer.allocated_timestamp,
- size,
- ),
- );
- }
-
- async fn check_allocated_tickets(&self) {
- // if the ticket is timeout, discard this.
- let mut timeout_ids = vec![];
- let iter = self.memory_allocated_of_app.iter();
- for app in iter {
- let app_id = &app.key().to_string();
- let app_iter = app.iter();
- for ticket in app_iter {
- if ticket.is_timeout(self.buffer_ticket_timeout_sec) {
- timeout_ids.push((app_id.to_string(),
ticket.key().clone()))
- }
- }
- }
- for (app_id, ticket_id) in timeout_ids {
- info!(
- "Releasing timeout ticket of id:{}, app_id:{}",
- ticket_id, app_id
- );
- let released = self.discard_tickets(&app_id, Some(ticket_id));
- if let Err(e) = self.budget.free_allocated(released).await {
- error!(
- "Errors on removing the timeout ticket of id:{},
app_id:{}. error: {:?}",
- ticket_id, app_id, e
- );
- }
- }
- }
}
#[async_trait]
impl Store for MemoryStore {
fn start(self: Arc<Self>) {
- // schedule check to find out the timeout allocated buffer ticket
- let mem_store = self.clone();
- self.runtime_manager.default_runtime.spawn(async move {
- loop {
- mem_store.check_allocated_tickets().await;
- delay_for(Duration::from_secs(
- mem_store.buffer_ticket_check_interval_sec as u64,
- ))
- .await;
- }
- });
+ // ignore
}
async fn insert(&self, ctx: WritingViewContext) -> Result<(), WorkerError>
{
let uid = ctx.uid;
let buffer = self.get_or_create_underlying_staging_buffer(uid.clone());
- let mut buffer_guarded = buffer
- .lock()
- .instrument_await("trying buffer lock to insert")
- .await;
+ let mut buffer_guarded = buffer.lock();
let blocks = ctx.data_blocks;
let inserted_size = buffer_guarded.add(blocks)?;
drop(buffer_guarded);
- self.budget
- .allocated_to_used(inserted_size)
- .instrument_await("make budget allocated -> used")
- .await?;
+ self.budget.allocated_to_used(inserted_size)?;
TOTAL_MEMORY_USED.inc_by(inserted_size as u64);
@@ -345,10 +273,7 @@ impl Store for MemoryStore {
async fn get(&self, ctx: ReadingViewContext) -> Result<ResponseData,
WorkerError> {
let uid = ctx.uid;
let buffer = self.get_or_create_underlying_staging_buffer(uid);
- let buffer = buffer
- .lock()
- .instrument_await("getting partitioned buffer lock")
- .await;
+ let buffer = buffer.lock();
let options = ctx.reading_options;
let (fetched_blocks, length) = match options {
@@ -462,34 +387,9 @@ impl Store for MemoryStore {
panic!("It should not be invoked.")
}
- async fn require_buffer(
- &self,
- ctx: RequireBufferContext,
- ) -> Result<RequireBufferResponse, WorkerError> {
- let (succeed, ticket_id) = self.budget.pre_allocate(ctx.size).await?;
- match succeed {
- true => {
- let require_buffer_resp =
RequireBufferResponse::new(ticket_id);
- self.cache_buffer_required_ticket(
- ctx.uid.app_id.as_str(),
- &require_buffer_resp,
- ctx.size,
- );
- Ok(require_buffer_resp)
- }
- _ => Err(WorkerError::NO_ENOUGH_MEMORY_TO_BE_ALLOCATED),
- }
- }
-
- async fn purge(&self, app_id: String) -> Result<()> {
- // free allocated
- let released_size = self.discard_tickets(app_id.as_str(), None);
- self.budget.free_allocated(released_size).await?;
- info!(
- "free allocated buffer size:[{}] for app:[{}]",
- released_size,
- app_id.as_str()
- );
+ async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+ let app_id = ctx.app_id;
+ let shuffle_id_option = ctx.shuffle_id;
// remove the corresponding app's data
let read_only_state_view = self.state.clone().into_read_only();
@@ -497,24 +397,31 @@ impl Store for MemoryStore {
for entry in read_only_state_view.iter() {
let pid = entry.0;
if pid.app_id == app_id {
- _removed_list.push(pid);
+ if ctx.shuffle_id.is_some() {
+ if pid.shuffle_id == shuffle_id_option.unwrap() {
+ _removed_list.push(pid);
+ } else {
+ continue;
+ }
+ } else {
+ _removed_list.push(pid);
+ }
}
}
let mut used = 0;
for removed_pid in _removed_list {
if let Some(entry) = self.state.remove(removed_pid) {
- used += entry.1.lock().await.total_size;
+ used += entry.1.lock().total_size;
}
}
// free used
- self.budget.free_used(used).await?;
+ self.budget.free_used(used)?;
info!(
- "removed used buffer size:[{}] for app:[{}]",
- used,
- app_id.as_str()
+ "removed used buffer size:[{}] for [{:?}], [{:?}]",
+ used, app_id, shuffle_id_option
);
Ok(())
@@ -523,6 +430,31 @@ impl Store for MemoryStore {
async fn is_healthy(&self) -> Result<bool> {
Ok(true)
}
+
+ async fn require_buffer(
+ &self,
+ ctx: RequireBufferContext,
+ ) -> Result<RequireBufferResponse, WorkerError> {
+ let (succeed, ticket_id) = self.budget.pre_allocate(ctx.size)?;
+ match succeed {
+ true => {
+ let require_buffer_resp =
RequireBufferResponse::new(ticket_id);
+ self.ticket_manager.insert(
+ ticket_id,
+ ctx.size,
+ require_buffer_resp.allocated_timestamp,
+ &ctx.uid.app_id,
+ );
+ Ok(require_buffer_resp)
+ }
+ _ => Err(WorkerError::NO_ENOUGH_MEMORY_TO_BE_ALLOCATED),
+ }
+ }
+
+ async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result<i64,
WorkerError> {
+ let ticket_id = ctx.ticket_id;
+ self.ticket_manager.delete(ticket_id)
+ }
}
/// thread safe, this will be guarded by the lock
@@ -662,12 +594,12 @@ impl MemoryBudget {
}
}
- pub async fn snapshot(&self) -> MemorySnapshot {
+ pub fn snapshot(&self) -> MemorySnapshot {
let inner = self.inner.lock().unwrap();
(inner.capacity, inner.allocated, inner.used).into()
}
- async fn pre_allocate(&self, size: i64) -> Result<(bool, i64)> {
+ fn pre_allocate(&self, size: i64) -> Result<(bool, i64)> {
let mut inner = self.inner.lock().unwrap();
let free_space = inner.capacity - inner.allocated - inner.used;
if free_space < size {
@@ -681,7 +613,7 @@ impl MemoryBudget {
}
}
- async fn allocated_to_used(&self, size: i64) -> Result<bool> {
+ fn allocated_to_used(&self, size: i64) -> Result<bool> {
let mut inner = self.inner.lock().unwrap();
if inner.allocated < size {
inner.allocated = 0;
@@ -694,7 +626,7 @@ impl MemoryBudget {
Ok(true)
}
- async fn free_used(&self, size: i64) -> Result<bool> {
+ fn free_used(&self, size: i64) -> Result<bool> {
let mut inner = self.inner.lock().unwrap();
if inner.used < size {
inner.used = 0;
@@ -706,7 +638,7 @@ impl MemoryBudget {
Ok(true)
}
- async fn free_allocated(&self, size: i64) -> Result<bool> {
+ fn free_allocated(&self, size: i64) -> Result<bool> {
let mut inner = self.inner.lock().unwrap();
if inner.allocated < size {
inner.allocated = 0;
@@ -721,7 +653,7 @@ impl MemoryBudget {
#[cfg(test)]
mod test {
use crate::app::{
- PartitionedUId, ReadingOptions, ReadingViewContext,
RequireBufferContext,
+ PartitionedUId, PurgeDataContext, ReadingOptions, ReadingViewContext,
RequireBufferContext,
WritingViewContext,
};
@@ -733,75 +665,10 @@ mod test {
use bytes::BytesMut;
use core::panic;
use std::sync::Arc;
- use std::thread;
- use std::time::Duration;
- use crate::config::MemoryStoreConfig;
- use crate::runtime::manager::RuntimeManager;
use anyhow::Result;
use croaring::Treemap;
- #[test]
- fn test_ticket_timeout() -> Result<()> {
- let cfg = MemoryStoreConfig::from("2M".to_string(), 1);
- let runtime_manager: RuntimeManager = Default::default();
- let mut store = MemoryStore::from(cfg, runtime_manager.clone());
-
- store.refresh_buffer_ticket_check_interval_sec(1);
-
- let store = Arc::new(store);
- store.clone().start();
-
- let app_id = "mocked-app-id";
- let ctx =
RequireBufferContext::new(PartitionedUId::from(app_id.to_string(), 1, 1), 1000);
- let resp = runtime_manager.wait(store.require_buffer(ctx.clone()))?;
- assert!(store.is_ticket_exist(app_id, resp.ticket_id));
-
- let snapshot = runtime_manager.wait(store.budget.snapshot());
- assert_eq!(snapshot.allocated, 1000);
- assert_eq!(snapshot.used, 0);
-
- thread::sleep(Duration::from_secs(5));
-
- assert!(!store.is_ticket_exist(app_id, resp.ticket_id));
-
- let snapshot = runtime_manager.wait(store.budget.snapshot());
- assert_eq!(snapshot.allocated, 0);
- assert_eq!(snapshot.used, 0);
-
- Ok(())
- }
-
- #[test]
- fn test_memory_buffer_ticket() -> Result<()> {
- let store = MemoryStore::new(1024 * 1000);
- let runtime = store.runtime_manager.clone();
-
- let app_id = "mocked-app-id";
- let ctx =
RequireBufferContext::new(PartitionedUId::from(app_id.to_string(), 1, 1), 1000);
- let resp = runtime.wait(store.require_buffer(ctx.clone()))?;
- let ticket_id_1 = resp.ticket_id;
-
- let resp = runtime.wait(store.require_buffer(ctx.clone()))?;
- let ticket_id_2 = resp.ticket_id;
-
- assert!(store.is_ticket_exist(app_id, ticket_id_1));
- assert!(store.is_ticket_exist(app_id, ticket_id_2));
- assert!(!store.is_ticket_exist(app_id, 100239));
-
- let snapshot = runtime.wait(store.budget.snapshot());
- assert_eq!(snapshot.allocated, 1000 * 2);
- assert_eq!(snapshot.used, 0);
-
- runtime.wait(store.purge(app_id.to_string()))?;
-
- let snapshot = runtime.wait(store.budget.snapshot());
- assert_eq!(snapshot.allocated, 0);
- assert_eq!(snapshot.used, 0);
-
- Ok(())
- }
-
#[test]
fn test_read_buffer_in_flight() {
let store = MemoryStore::new(1024);
@@ -894,7 +761,7 @@ mod test {
// case4: some data are in inflight blocks
let buffer =
store.get_or_create_underlying_staging_buffer(uid.clone());
- let mut buffer = runtime.wait(buffer.lock());
+ let mut buffer = buffer.lock();
let owned = buffer.staging.to_owned();
buffer.staging.clear();
let mut idx = 0;
@@ -932,7 +799,7 @@ mod test {
// case5: old data in in_flight and latest data in staging.
// read it from the block id 9, and read size of 30
let buffer =
store.get_or_create_underlying_staging_buffer(uid.clone());
- let mut buffer = runtime.wait(buffer.lock());
+ let mut buffer = buffer.lock();
buffer.staging.push(PartitionedDataBlock {
block_id: 20,
length: 10,
@@ -1033,15 +900,12 @@ mod test {
};
match runtime.default_runtime.block_on(store.require_buffer(ctx)) {
Ok(_) => {
- let _ = runtime
- .default_runtime
- .block_on(store.purge("100".to_string()));
+ let _ =
runtime.default_runtime.block_on(store.purge("100".into()));
}
_ => panic!(),
}
let budget = store.budget.inner.lock().unwrap();
- assert_eq!(0, budget.allocated);
assert_eq!(0, budget.used);
assert_eq!(1024 * 1024 * 1024, budget.capacity);
}
@@ -1094,16 +958,24 @@ mod test {
"Failed to obtain weak reference before purge"
);
+ // partial purge for app's one shuffle data
+ runtime
+ .wait(store.purge(PurgeDataContext::new(app_id.to_string(),
Some(shuffle_id))))
+ .expect("");
+ assert!(!store.state.contains_key(&PartitionedUId::from(
+ app_id.to_string(),
+ shuffle_id,
+ partition
+ )));
+
// purge
- runtime.wait(store.purge(app_id.to_string())).expect("");
+ runtime.wait(store.purge(app_id.into())).expect("");
assert!(
weak_ref_before.clone().unwrap().upgrade().is_none(),
"Arc should not exist after purge"
);
- let snapshot = runtime.wait(store.budget.snapshot());
+ let snapshot = store.budget.snapshot();
assert_eq!(snapshot.used, 0);
- // the remaining allocated will be removed.
- assert_eq!(snapshot.allocated, 0);
assert_eq!(snapshot.capacity, 1024);
let data = runtime.wait(store.get(reading_ctx.clone())).expect("");
assert_eq!(0, data.from_memory().shuffle_data_block_segments.len());
diff --git a/rust/experimental/server/src/store/mod.rs
b/rust/experimental/server/src/store/mod.rs
index 9e23545b5..f29c7bf56 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -23,7 +23,8 @@ pub mod mem;
pub mod memory;
use crate::app::{
- ReadingIndexViewContext, ReadingViewContext, RequireBufferContext,
WritingViewContext,
+ PurgeDataContext, ReadingIndexViewContext, ReadingViewContext,
ReleaseBufferContext,
+ RequireBufferContext, WritingViewContext,
};
use crate::config::Config;
use crate::error::WorkerError;
@@ -166,12 +167,14 @@ pub trait Store {
&self,
ctx: ReadingIndexViewContext,
) -> Result<ResponseDataIndex, WorkerError>;
+ async fn purge(&self, ctx: PurgeDataContext) -> Result<()>;
+ async fn is_healthy(&self) -> Result<bool>;
+
async fn require_buffer(
&self,
ctx: RequireBufferContext,
) -> Result<RequireBufferResponse, WorkerError>;
- async fn purge(&self, app_id: String) -> Result<()>;
- async fn is_healthy(&self) -> Result<bool>;
+ async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result<i64,
WorkerError>;
}
pub trait Persistent {}
diff --git a/rust/experimental/server/tests/write_read.rs
b/rust/experimental/server/tests/write_read.rs
index 499005a2d..09f29a682 100644
--- a/rust/experimental/server/tests/write_read.rs
+++ b/rust/experimental/server/tests/write_read.rs
@@ -27,6 +27,7 @@ mod tests {
use std::time::Duration;
use tonic::transport::Channel;
+ use uniffle_worker::metric::GAUGE_MEMORY_ALLOCATED;
fn create_mocked_config(grpc_port: i32, capacity: String, local_data_path:
String) -> Config {
Config {
@@ -82,6 +83,9 @@ mod tests {
let client = ShuffleServerClient::connect(format!("http://{}:{}",
"0.0.0.0", port)).await?;
+ // after one batch write/read process, the allocated memory size
should be 0
+ assert_eq!(0, GAUGE_MEMORY_ALLOCATED.get());
+
write_read_for_one_time(client).await
}
}