This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new a98412ec8 [#1407] improvement(rust): Critical bug fix of getting 
blockIds and some optimization (#1408)
a98412ec8 is described below

commit a98412ec8858ce8cebc977ab7b7f48366973cc63
Author: Junfan Zhang <[email protected]>
AuthorDate: Sat Dec 30 09:46:45 2023 +0800

    [#1407] improvement(rust): Critical bug fix of getting blockIds and some 
optimization (#1408)
    
    ### What changes were proposed in this pull request?
    
    To improve the rust based server performance and fix some bugs
    
    ### Why are the changes needed?
    
    - [x] fix getting blockId failure
    - [x] support unregister/finish shuffle grpc interface explicitly
    - [x] fix the memory reserve when app is deleted
    - [x] fix the buffer ticket reserve when app is deleted.
       - [x] add test cases
       - [x] fix the release bug when the partial data sent failed, the 
allocated should be released
       - [x] refactor ticket management
    - [x] optimize the lock performance when high concurrency of writing
    
    Fix: #1407
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Detailed commits log
    
    All below commits are all squashed into one commit.
    
    * feat: unify grpc service start to grpc runtime manager
    * fix(rust): forget to call report blockIds method await
    * feat: support unregister grpc interface
    * fix: free unallocated size for one data sending
    * feat: introduce the cap to constrain memory allocation
    * feat: refactor ticket manager
    * chore: update the latest benchmark with 1T terasort
    * feat: use spin lock to get better latency
    * chore(doc): add netty based server benchmark report
---
 rust/experimental/server/Cargo.lock               |  26 +-
 rust/experimental/server/Cargo.toml               |   3 +
 rust/experimental/server/README.md                |  51 ++--
 rust/experimental/server/src/app.rs               | 109 ++++---
 rust/experimental/server/src/error.rs             |   3 +
 rust/experimental/server/src/grpc.rs              |  76 ++++-
 rust/experimental/server/src/lib.rs               |  35 ++-
 rust/experimental/server/src/main.rs              |  17 +-
 rust/experimental/server/src/mem_allocator/mod.rs |   6 +-
 rust/experimental/server/src/store/hdfs.rs        |  11 +-
 rust/experimental/server/src/store/hybrid.rs      |  51 ++--
 rust/experimental/server/src/store/localfile.rs   |  82 +++--
 rust/experimental/server/src/store/mem/mod.rs     |  30 +-
 rust/experimental/server/src/store/mem/ticket.rs  | 231 ++++++++++++++
 rust/experimental/server/src/store/memory.rs      | 350 +++++++---------------
 rust/experimental/server/src/store/mod.rs         |   9 +-
 rust/experimental/server/tests/write_read.rs      |   4 +
 17 files changed, 687 insertions(+), 407 deletions(-)

diff --git a/rust/experimental/server/Cargo.lock 
b/rust/experimental/server/Cargo.lock
index ec8ed2930..6c0466225 100644
--- a/rust/experimental/server/Cargo.lock
+++ b/rust/experimental/server/Cargo.lock
@@ -127,6 +127,12 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "awaitility"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "46ee60785cbb3a23bde2c462098564a6337432b9fa032a62bb7ddb5e734c135d"
+
 [[package]]
 name = "axum"
 version = "0.6.20"
@@ -265,6 +271,12 @@ version = "1.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
 
+[[package]]
+name = "cap"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6f125eb85b84a24c36b02ed1d22c9dd8632f53b3cde6e4d23512f94021030003"
+
 [[package]]
 name = "cassowary"
 version = "0.3.0"
@@ -2243,7 +2255,7 @@ dependencies = [
  "cc",
  "libc",
  "once_cell",
- "spin",
+ "spin 0.5.2",
  "untrusted",
  "web-sys",
  "winapi",
@@ -2539,6 +2551,15 @@ version = "0.5.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
 
+[[package]]
+name = "spin"
+version = "0.9.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
+dependencies = [
+ "lock_api",
+]
+
 [[package]]
 name = "sse-codec"
 version = "0.3.2"
@@ -3174,7 +3195,9 @@ dependencies = [
  "async-channel",
  "async-trait",
  "await-tree",
+ "awaitility",
  "bytes 1.5.0",
+ "cap",
  "clap",
  "console-subscriber",
  "crc32fast",
@@ -3198,6 +3221,7 @@ dependencies = [
  "serde",
  "signal-hook",
  "socket2 0.4.9",
+ "spin 0.9.8",
  "tempdir",
  "tempfile",
  "thiserror",
diff --git a/rust/experimental/server/Cargo.toml 
b/rust/experimental/server/Cargo.toml
index 62a784265..8c1f97b39 100644
--- a/rust/experimental/server/Cargo.toml
+++ b/rust/experimental/server/Cargo.toml
@@ -95,6 +95,8 @@ pin-project-lite = "0.2.8"
 signal-hook = "0.3.17"
 clap = "3.0.14"
 socket2 = { version="0.4", features = ["all"]}
+cap = "0.1.2"
+spin = "0.9.8"
 
 [dependencies.hdrs]
 version = "0.3.0"
@@ -122,6 +124,7 @@ prost-build = "0.11.9"
 
 [dev-dependencies]
 env_logger = "0.10.0"
+awaitility = "0.3.1"
 
 [profile.dev]
 # re-enable debug assertions when pprof-rs fixed the reports for misaligned 
pointer dereferences
diff --git a/rust/experimental/server/README.md 
b/rust/experimental/server/README.md
index 52ef6a3a5..cc8d1ec8d 100644
--- a/rust/experimental/server/README.md
+++ b/rust/experimental/server/README.md
@@ -20,18 +20,20 @@ Another implementation of Apache Uniffle shuffle server
 ## Benchmark report
 
 #### Environment
-_Software_: Uniffle 0.7.0 / Hadoop 3.2.2 / Spark 3.1.2
+
+_Software_: Uniffle 0.8.0 / Hadoop 3.2.2 / Spark 3.1.2
 
 _Hardware_: Machine 96 cores, 512G memory, 1T * 4 SSD, network bandwidth 8GB/s
 
 _Hadoop Yarn Cluster_: 1 * ResourceManager + 40 * NodeManager, every machine 
4T * 4 HDD
 
-_Uniffle Cluster_: 1 * Coordinator + 5 * Shuffle Server, every machine 1T * 4 
SSD
+_Uniffle Cluster_: 1 * Coordinator + 1 * Shuffle Server, every machine 1T * 4 
NVME SSD
 
 #### Configuration
+
 spark's conf
-``` 
-spark.executor.instances 100
+```yaml
+spark.executor.instances 400
 spark.executor.cores 1
 spark.executor.memory 2g
 spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
@@ -39,42 +41,51 @@ spark.rss.storage.type MEMORY_LOCALFILE
 ``` 
 
 uniffle grpc-based server's conf
-``` 
-JVM XMX=130g
+``` yaml
+JVM XMX=30g
+
+# JDK11 + G1 
 
-...
-rss.server.buffer.capacity 100g
-rss.server.read.buffer.capacity 20g
+rss.server.buffer.capacity 10g
+rss.server.read.buffer.capacity 10g
 rss.server.flush.thread.alive 10
 rss.server.flush.threadPool.size 50
 rss.server.high.watermark.write 80
 rss.server.low.watermark.write 70
-...
 ``` 
 
-uniffle-x(Rust-based)'s conf
+Rust-based shuffle-server conf
 ```
 store_type = "MEMORY_LOCALFILE"
 
 [memory_store]
-capacity = "100G"
+capacity = "10G"
 
 [localfile_store]
 data_paths = ["/data1/uniffle", "/data2/uniffle", "/data3/uniffle", 
"/data4/uniffle"]
 healthy_check_min_disks = 0
 
 [hybrid_store]
-memory_spill_high_watermark = 0.5
-memory_spill_low_watermark = 0.4
+memory_spill_high_watermark = 0.8
+memory_spill_low_watermark = 0.7
 ``` 
 
-#### Tera Sort
-| type                         |       100G       |           1T          | 5T 
(run with 400 executors) |
-|------------------------------|:----------------:|:---------------------:|:---------------------------:|
-| vanilla uniffle (grpc-based) | 1.4min (29s/53s) | 12min (4.7min/7.0min) |    
18.7min(12min/6.7min)    |
-| uniffle-x                    | 1.3min (28s/48s) | 11min (4.3min/6.5min) |    
14min(7.8min/6.2min)     |
+#### TeraSort cost times
+| type/buffer capacity                 | 250G (compressed)  |                  
       comment                          |
+|--------------------------------------|:------------------:|:--------------------------------------------------------:|
+| vanilla uniffle (grpc-based)  / 10g  |  5.3min (2.3m/3m)  |                  
        1.9G/s                          |
+| vanilla uniffle (grpc-based)  / 300g | 5.6min (3.7m/1.9m) |              GC 
occurs frequently / 2.5G/s               |
+| vanilla uniffle (netty-based) / 10g  |         /          | read failed. 
2.5G/s (write is better due to zero copy)   |
+| vanilla uniffle (netty-based) / 300g |         /          |                  
       app hang                         |
+| rust based shuffle server     / 10g  | 4.6min (2.2m/2.4m) |                  
       2.0 G/s                          |
+| rust based shuffle server     / 300g |  4min (1.5m/2.5m)  |                  
       3.5 G/s                          |
+
+
+Compared with grpc based server, rust-based server has less memory footprint 
and stable performance.  
+
+And Netty is still not stable for production env.
 
-> Tips: When running 5T on vanilla Uniffle, data sent timeouts may occur, and 
there can be occasional failures in client fetching the block bitmap.
+In the future, rust-based server will use io_uring mechanism to improve 
writing performance.
 
 ## Build
 
diff --git a/rust/experimental/server/src/app.rs 
b/rust/experimental/server/src/app.rs
index d75f8f969..9298fb6e3 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -31,7 +31,7 @@ use crate::store::{
     StoreProvider,
 };
 use crate::util::current_timestamp_sec;
-use anyhow::Result;
+use anyhow::{anyhow, Result};
 use bytes::Bytes;
 use croaring::treemap::JvmSerializer;
 use croaring::Treemap;
@@ -173,13 +173,15 @@ impl App {
         Ok(())
     }
 
-    pub async fn insert(&self, ctx: WritingViewContext) -> Result<(), 
WorkerError> {
+    pub async fn insert(&self, ctx: WritingViewContext) -> Result<i32, 
WorkerError> {
         let len: i32 = ctx.data_blocks.iter().map(|block| block.length).sum();
         self.get_underlying_partition_bitmap(ctx.uid.clone())
             .incr_data_size(len)?;
         TOTAL_RECEIVED_DATA.inc_by(len as u64);
 
-        self.store.insert(ctx).await
+        self.store.insert(ctx).await?;
+
+        Ok(len)
     }
 
     pub async fn select(&self, ctx: ReadingViewContext) -> 
Result<ResponseData, WorkerError> {
@@ -231,13 +233,8 @@ impl App {
         }
     }
 
-    pub fn is_buffer_ticket_exist(&self, ticket_id: i64) -> bool {
-        self.store
-            .is_buffer_ticket_exist(self.app_id.as_str(), ticket_id)
-    }
-
-    pub fn discard_tickets(&self, ticket_id: i64) -> i64 {
-        self.store.discard_tickets(self.app_id.as_str(), ticket_id)
+    pub async fn free_allocated_memory_size(&self, size: i64) -> Result<bool> {
+        self.store.free_hot_store_allocated_memory_size(size).await
     }
 
     pub async fn require_buffer(
@@ -252,6 +249,12 @@ impl App {
         self.store.require_buffer(ctx).await
     }
 
+    pub async fn release_buffer(&self, ticket_id: i64) -> Result<i64, 
WorkerError> {
+        self.store
+            .release_buffer(ReleaseBufferContext::from(ticket_id))
+            .await
+    }
+
     fn get_underlying_partition_bitmap(&self, uid: PartitionedUId) -> 
PartitionedMeta {
         let shuffle_id = uid.shuffle_id;
         let partition_id = uid.partition_id;
@@ -277,13 +280,30 @@ impl App {
     }
 
     pub async fn purge(&self, app_id: String, shuffle_id: Option<i32>) -> 
Result<()> {
-        if shuffle_id.is_some() {
-            error!("Partial purge is not supported.");
-        } else {
-            self.store.purge(app_id).await?
-        }
+        self.store
+            .purge(PurgeDataContext::new(app_id, shuffle_id))
+            .await
+    }
+}
 
-        Ok(())
+#[derive(Debug, Clone)]
+pub struct PurgeDataContext {
+    pub(crate) app_id: String,
+    pub(crate) shuffle_id: Option<i32>,
+}
+
+impl PurgeDataContext {
+    pub fn new(app_id: String, shuffle_id: Option<i32>) -> PurgeDataContext {
+        PurgeDataContext { app_id, shuffle_id }
+    }
+}
+
+impl From<&str> for PurgeDataContext {
+    fn from(app_id_ref: &str) -> Self {
+        PurgeDataContext {
+            app_id: app_id_ref.to_string(),
+            shuffle_id: None,
+        }
     }
 }
 
@@ -321,6 +341,17 @@ pub struct RequireBufferContext {
     pub size: i64,
 }
 
+#[derive(Debug, Clone)]
+pub struct ReleaseBufferContext {
+    pub(crate) ticket_id: i64,
+}
+
+impl From<i64> for ReleaseBufferContext {
+    fn from(value: i64) -> Self {
+        Self { ticket_id: value }
+    }
+}
+
 impl RequireBufferContext {
     pub fn new(uid: PartitionedUId, size: i64) -> Self {
         Self { uid, size }
@@ -343,7 +374,7 @@ pub enum PurgeEvent {
     // app_id
     HEART_BEAT_TIMEOUT(String),
     // app_id + shuffle_id
-    APP_PARTIAL_SHUFFLES_PURGE(String, Vec<i32>),
+    APP_PARTIAL_SHUFFLES_PURGE(String, i32),
     // app_id
     APP_PURGE(String),
 }
@@ -424,18 +455,18 @@ impl AppManager {
                             "The app:[{}]'s data will be purged due to 
heartbeat timeout",
                             &app_id
                         );
-                        app_manager_cloned.purge_app_data(app_id).await
+                        app_manager_cloned.purge_app_data(app_id, None).await
                     }
                     PurgeEvent::APP_PURGE(app_id) => {
                         info!(
                             "The app:[{}] has been finished, its data will be 
purged.",
                             &app_id
                         );
-                        app_manager_cloned.purge_app_data(app_id).await
+                        app_manager_cloned.purge_app_data(app_id, None).await
                     }
-                    PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(_app_id, 
_shuffle_ids) => {
-                        info!("Partial data purge is not supported currently");
-                        Ok(())
+                    PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id) 
=> {
+                        info!("The app:[{:?}] with shuffleId: [{:?}] will be 
purged due to unregister grpc interface", &app_id, shuffle_id);
+                        app_manager_cloned.purge_app_data(app_id, 
Some(shuffle_id)).await
                     }
                 }
                 .map_err(|err| error!("Errors on purging data. error: {:?}", 
err));
@@ -457,19 +488,16 @@ impl AppManager {
         self.store.memory_spill_event_num()
     }
 
-    async fn purge_app_data(&self, app_id: String) -> Result<()> {
-        let app = self.get_app(&app_id);
-        if app.is_none() {
-            error!(
-                "App:{} don't exist when purging data, this should not happen",
-                &app_id
-            );
-        } else {
-            let app = app.unwrap();
-            app.purge(app_id.clone(), None).await?;
-        }
+    async fn purge_app_data(&self, app_id: String, shuffle_id_option: 
Option<i32>) -> Result<()> {
+        let app = self.get_app(&app_id).ok_or(anyhow!(format!(
+            "App:{} don't exist when purging data, this should not happen",
+            &app_id
+        )))?;
+        app.purge(app_id.clone(), shuffle_id_option).await?;
 
-        self.apps.remove(&app_id);
+        if shuffle_id_option.is_none() {
+            self.apps.remove(&app_id);
+        }
 
         Ok(())
     }
@@ -520,13 +548,10 @@ impl AppManager {
         app_ref.register_shuffle(shuffle_id)
     }
 
-    pub async fn unregister(&self, app_id: String, shuffle_ids: 
Option<Vec<i32>>) -> Result<()> {
-        let event = match shuffle_ids {
-            Some(ids) => PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, ids),
-            _ => PurgeEvent::APP_PURGE(app_id),
-        };
-
-        self.sender.send(event).await?;
+    pub async fn unregister(&self, app_id: String, shuffle_id: i32) -> 
Result<()> {
+        self.sender
+            .send(PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id))
+            .await?;
         Ok(())
     }
 }
@@ -649,7 +674,7 @@ mod test {
 
             // case3: purge
             app_manager_ref
-                .purge_app_data(app_id.to_string())
+                .purge_app_data(app_id.to_string(), None)
                 .await
                 .expect("");
 
diff --git a/rust/experimental/server/src/error.rs 
b/rust/experimental/server/src/error.rs
index 0f411af0c..cc07536a5 100644
--- a/rust/experimental/server/src/error.rs
+++ b/rust/experimental/server/src/error.rs
@@ -48,6 +48,9 @@ pub enum WorkerError {
 
     #[error("Http request failed. {0}")]
     HTTP_SERVICE_ERROR(String),
+
+    #[error("Ticket id: {0} not exist")]
+    TICKET_ID_NOT_EXIST(i64),
 }
 
 impl From<AcquireError> for WorkerError {
diff --git a/rust/experimental/server/src/grpc.rs 
b/rust/experimental/server/src/grpc.rs
index 0c9730522..7c784c245 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -103,12 +103,24 @@ impl ShuffleServer for DefaultShuffleServer {
 
     async fn unregister_shuffle(
         &self,
-        _request: Request<ShuffleUnregisterRequest>,
+        request: Request<ShuffleUnregisterRequest>,
     ) -> Result<Response<ShuffleUnregisterResponse>, Status> {
-        // todo: implement shuffle level deletion
-        info!("Accepted unregister shuffle info....");
+        let request = request.into_inner();
+        let shuffle_id = request.shuffle_id;
+        let app_id = request.app_id;
+
+        info!(
+            "Accepted unregister shuffle info for [app:{:?}, shuffle_id:{:?}]",
+            &app_id, shuffle_id
+        );
+        let status_code = self
+            .app_manager_ref
+            .unregister(app_id, shuffle_id)
+            .await
+            .map_or_else(|_e| StatusCode::INTERNAL_ERROR, |_| 
StatusCode::SUCCESS);
+
         Ok(Response::new(ShuffleUnregisterResponse {
-            status: StatusCode::SUCCESS.into(),
+            status: status_code.into(),
             ret_msg: "".to_string(),
         }))
     }
@@ -137,15 +149,16 @@ impl ShuffleServer for DefaultShuffleServer {
 
         let app = app_option.unwrap();
 
-        if !app.is_buffer_ticket_exist(ticket_id) {
+        let release_result = app.release_buffer(ticket_id).await;
+        if release_result.is_err() {
             return Ok(Response::new(SendShuffleDataResponse {
                 status: StatusCode::NO_BUFFER.into(),
                 ret_msg: "No such buffer ticket id, it may be discarded due to 
timeout".to_string(),
             }));
-        } else {
-            app.discard_tickets(ticket_id);
         }
 
+        let ticket_required_size = release_result.unwrap();
+
         let mut blocks_map = HashMap::new();
         for shuffle_data in req.shuffle_data {
             let data: PartitionedData = shuffle_data.into();
@@ -156,7 +169,15 @@ impl ShuffleServer for DefaultShuffleServer {
             blocks.extend(partitioned_blocks);
         }
 
+        let mut inserted_failure_occurs = false;
+        let mut inserted_failure_error = None;
+        let mut inserted_total_size = 0;
+
         for (partition_id, blocks) in blocks_map.into_iter() {
+            if inserted_failure_occurs {
+                continue;
+            }
+
             let uid = PartitionedUId {
                 app_id: app_id.clone(),
                 shuffle_id,
@@ -178,13 +199,35 @@ impl ShuffleServer for DefaultShuffleServer {
                     inserted.err()
                 );
                 error!("{}", &err);
-                return Ok(Response::new(SendShuffleDataResponse {
-                    status: StatusCode::INTERNAL_ERROR.into(),
-                    ret_msg: err,
-                }));
+
+                inserted_failure_error = Some(err);
+                inserted_failure_occurs = true;
+                continue;
+            }
+
+            let inserted_size = inserted.unwrap();
+            inserted_total_size += inserted_size as i64;
+        }
+
+        let unused_allocated_size = ticket_required_size - inserted_total_size;
+        if unused_allocated_size != 0 {
+            debug!("The required buffer size:[{:?}] has remaining allocated 
size:[{:?}] of unused, this should not happen",
+                ticket_required_size, unused_allocated_size);
+            if let Err(e) = 
app.free_allocated_memory_size(unused_allocated_size).await {
+                warn!(
+                    "Errors on free allocated size: {:?} for app: {:?}. err: 
{:#?}",
+                    unused_allocated_size, &app_id, e
+                );
             }
         }
 
+        if inserted_failure_occurs {
+            return Ok(Response::new(SendShuffleDataResponse {
+                status: StatusCode::INTERNAL_ERROR.into(),
+                ret_msg: inserted_failure_error.unwrap(),
+            }));
+        }
+
         timer.observe_duration();
 
         Ok(Response::new(SendShuffleDataResponse {
@@ -446,7 +489,16 @@ impl ShuffleServer for DefaultShuffleServer {
                 },
                 blocks: partition_to_block_id.block_ids,
             };
-            let _ = app.report_block_ids(ctx);
+
+            match app.report_block_ids(ctx).await {
+                Err(e) => {
+                    return Ok(Response::new(ReportShuffleResultResponse {
+                        status: StatusCode::INTERNAL_ERROR.into(),
+                        ret_msg: e.to_string(),
+                    }))
+                }
+                _ => (),
+            }
         }
 
         Ok(Response::new(ReportShuffleResultResponse {
diff --git a/rust/experimental/server/src/lib.rs 
b/rust/experimental/server/src/lib.rs
index 4cf340392..1219285b1 100644
--- a/rust/experimental/server/src/lib.rs
+++ b/rust/experimental/server/src/lib.rs
@@ -40,13 +40,15 @@ use 
crate::proto::uniffle::shuffle_server_client::ShuffleServerClient;
 use crate::proto::uniffle::shuffle_server_server::ShuffleServerServer;
 use crate::proto::uniffle::{
     GetLocalShuffleDataRequest, GetLocalShuffleIndexRequest, 
GetMemoryShuffleDataRequest,
-    RequireBufferRequest, SendShuffleDataRequest, ShuffleBlock, ShuffleData,
-    ShuffleRegisterRequest,
+    GetShuffleResultRequest, PartitionToBlockIds, ReportShuffleResultRequest, 
RequireBufferRequest,
+    SendShuffleDataRequest, ShuffleBlock, ShuffleData, ShuffleRegisterRequest,
 };
 use crate::runtime::manager::RuntimeManager;
 use crate::util::gen_worker_uid;
 use anyhow::Result;
 use bytes::{Buf, Bytes, BytesMut};
+use croaring::treemap::JvmSerializer;
+use croaring::Treemap;
 use log::info;
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 use std::time::Duration;
@@ -164,6 +166,20 @@ pub async fn write_read_for_one_time(mut client: 
ShuffleServerClient<Channel>) -
 
         let response = response.into_inner();
         assert_eq!(0, response.status);
+
+        // report the finished block ids
+        client
+            .report_shuffle_result(ReportShuffleResultRequest {
+                app_id: app_id.clone(),
+                shuffle_id: 0,
+                task_attempt_id: 0,
+                bitmap_num: 0,
+                partition_to_block_ids: vec![PartitionToBlockIds {
+                    partition_id: idx,
+                    block_ids: vec![idx as i64],
+                }],
+            })
+            .await?;
     }
 
     tokio::time::sleep(Duration::from_secs(1)).await;
@@ -173,6 +189,21 @@ pub async fn write_read_for_one_time(mut client: 
ShuffleServerClient<Channel>) -
 
     // firstly. read from the memory
     for idx in 0..batch_size {
+        let block_id_result = client
+            .get_shuffle_result(GetShuffleResultRequest {
+                app_id: app_id.clone(),
+                shuffle_id: 0,
+                partition_id: idx,
+            })
+            .await?
+            .into_inner();
+
+        assert_eq!(0, block_id_result.status);
+
+        let block_id_bitmap = 
Treemap::deserialize(&*block_id_result.serialized_bitmap)?;
+        assert_eq!(1, block_id_bitmap.iter().count());
+        assert!(block_id_bitmap.contains(idx as u64));
+
         let response_data = client
             .get_memory_shuffle_data(GetMemoryShuffleDataRequest {
                 app_id: app_id.clone(),
diff --git a/rust/experimental/server/src/main.rs 
b/rust/experimental/server/src/main.rs
index f8cc817c4..d2fafa4d4 100644
--- a/rust/experimental/server/src/main.rs
+++ b/rust/experimental/server/src/main.rs
@@ -32,10 +32,13 @@ use crate::runtime::manager::RuntimeManager;
 use crate::signal::details::graceful_wait_for_signal;
 use crate::util::{gen_worker_uid, get_local_ip};
 
+use crate::mem_allocator::ALLOCATOR;
+use crate::readable_size::ReadableSize;
 use anyhow::Result;
 use clap::{App, Arg};
 use log::{debug, error, info};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::str::FromStr;
 use std::time::Duration;
 use tokio::net::TcpListener;
 use tokio::sync::broadcast;
@@ -60,6 +63,7 @@ pub mod signal;
 pub mod store;
 mod util;
 
+const MAX_MEMORY_ALLOCATION_SIZE_ENV_KEY: &str = "MAX_MEMORY_ALLOCATION_SIZE";
 const DEFAULT_SHUFFLE_SERVER_TAG: &str = "ss_v4";
 
 fn start_coordinator_report(
@@ -170,6 +174,8 @@ fn init_log(log: &LogConfig) -> WorkerGuard {
 }
 
 fn main() -> Result<()> {
+    setup_max_memory_allocation();
+
     let args_match = App::new("Uniffle Worker")
         .version("0.9.0-SNAPSHOT")
         .about("Rust based shuffle server for Apache Uniffle")
@@ -232,7 +238,7 @@ fn main() -> Result<()> {
             .max_decoding_message_size(usize::MAX)
             .max_encoding_message_size(usize::MAX);
         let service_tx = tx.subscribe();
-        std::thread::spawn(move || {
+        runtime_manager.grpc_runtime.spawn_blocking(move || {
             tokio::runtime::Builder::new_current_thread()
                 .enable_all()
                 .build()
@@ -246,6 +252,13 @@ fn main() -> Result<()> {
     Ok(())
 }
 
+fn setup_max_memory_allocation() {
+    let _ = std::env::var(MAX_MEMORY_ALLOCATION_SIZE_ENV_KEY).map(|v| {
+        let readable_size = ReadableSize::from_str(v.as_str()).unwrap();
+        ALLOCATOR.set_limit(readable_size.as_bytes() as usize)
+    });
+}
+
 async fn grpc_serve(
     service: ShuffleServerServer<DefaultShuffleServer>,
     addr: SocketAddr,
@@ -283,7 +296,7 @@ async fn grpc_serve(
             if let Err(err) = rx.recv().await {
                 error!("Errors on stopping the GRPC service, err: {:?}.", err);
             } else {
-                info!("GRPC service has been graceful stopped.");
+                debug!("GRPC service has been graceful stopped.");
             }
         })
         .await
diff --git a/rust/experimental/server/src/mem_allocator/mod.rs 
b/rust/experimental/server/src/mem_allocator/mod.rs
index 6751643ee..d075f4ea6 100644
--- a/rust/experimental/server/src/mem_allocator/mod.rs
+++ b/rust/experimental/server/src/mem_allocator/mod.rs
@@ -24,9 +24,10 @@ mod imp;
 #[path = "system_std.rs"]
 mod imp;
 
-// set default allocator
+use cap::Cap;
+
 #[global_allocator]
-static ALLOC: imp::Allocator = imp::allocator();
+pub static ALLOCATOR: Cap<imp::Allocator> = Cap::new(imp::allocator(), 
usize::max_value());
 
 pub mod error;
 pub type AllocStats = Vec<(&'static str, usize)>;
@@ -34,6 +35,7 @@ pub type AllocStats = Vec<(&'static str, usize)>;
 // when memory-prof feature is enabled, provide empty profiling functions
 #[cfg(not(all(unix, feature = "memory-prof")))]
 mod default;
+
 #[cfg(not(all(unix, feature = "memory-prof")))]
 pub use default::*;
 
diff --git a/rust/experimental/server/src/store/hdfs.rs 
b/rust/experimental/server/src/store/hdfs.rs
index f783b379e..c36af4732 100644
--- a/rust/experimental/server/src/store/hdfs.rs
+++ b/rust/experimental/server/src/store/hdfs.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 use crate::app::{
-    PartitionedUId, ReadingIndexViewContext, ReadingViewContext, 
RequireBufferContext,
-    WritingViewContext,
+    PartitionedUId, PurgeDataContext, ReadingIndexViewContext, 
ReadingViewContext,
+    ReleaseBufferContext, RequireBufferContext, WritingViewContext,
 };
 use crate::config::HdfsStoreConfig;
 use crate::error::WorkerError;
@@ -246,7 +246,12 @@ impl Store for HdfsStore {
         todo!()
     }
 
-    async fn purge(&self, app_id: String) -> Result<()> {
+    async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64, 
WorkerError> {
+        todo!()
+    }
+
+    async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+        let app_id = ctx.app_id;
         let app_dir = self.get_app_dir(app_id.as_str());
 
         let keys_to_delete: Vec<_> = self
diff --git a/rust/experimental/server/src/store/hybrid.rs 
b/rust/experimental/server/src/store/hybrid.rs
index d81314212..32be53511 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 use crate::app::{
-    PartitionedUId, ReadingIndexViewContext, ReadingOptions, 
ReadingViewContext,
-    RequireBufferContext, WritingViewContext,
+    PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingOptions, 
ReadingViewContext,
+    ReleaseBufferContext, RequireBufferContext, WritingViewContext,
 };
 use crate::await_tree::AWAIT_TREE_REGISTRY;
 
@@ -47,11 +47,12 @@ use std::any::Any;
 use std::collections::VecDeque;
 
 use await_tree::InstrumentAwait;
+use spin::mutex::Mutex;
 use std::str::FromStr;
 use std::sync::Arc;
 
 use crate::runtime::manager::RuntimeManager;
-use tokio::sync::{Mutex, Semaphore};
+use tokio::sync::Semaphore;
 
 trait PersistentStore: Store + Persistent + Send + Sync {}
 impl PersistentStore for LocalFileStore {}
@@ -246,7 +247,7 @@ impl HybridStore {
         self.hot_store
             .release_in_flight_blocks_in_underlying_staging_buffer(uid, 
in_flight_blocks_id)
             .await?;
-        self.hot_store.free_memory(spill_size).await?;
+        self.hot_store.free_used(spill_size).await?;
 
         match self.get_store_type(candidate_store) {
             StorageType::LOCALFILE => {
@@ -261,13 +262,8 @@ impl HybridStore {
         Ok(message)
     }
 
-    // For app to check the ticket allocated existence and discard if it has 
been used
-    pub fn is_buffer_ticket_exist(&self, app_id: &str, ticket_id: i64) -> bool 
{
-        self.hot_store.is_ticket_exist(app_id, ticket_id)
-    }
-
-    pub fn discard_tickets(&self, app_id: &str, ticket_id: i64) -> i64 {
-        self.hot_store.discard_tickets(app_id, Some(ticket_id))
+    pub async fn free_hot_store_allocated_memory_size(&self, size: i64) -> 
Result<bool> {
+        self.hot_store.free_allocated(size).await
     }
 
     pub async fn get_hot_store_memory_snapshot(&self) -> 
Result<MemorySnapshot> {
@@ -404,7 +400,7 @@ impl Store for HybridStore {
                 let buffer = self
                     .hot_store
                     .get_or_create_underlying_staging_buffer(uid.clone());
-                let mut buffer_inner = buffer.lock().await;
+                let mut buffer_inner = buffer.lock();
                 if size.as_bytes() < buffer_inner.get_staging_size()? as u64 {
                     let (in_flight_uid, blocks) = 
buffer_inner.migrate_staging_to_in_flight()?;
                     self.make_memory_buffer_flush(in_flight_uid, blocks, 
uid.clone())
@@ -415,7 +411,7 @@ impl Store for HybridStore {
 
         // if the used size exceed the ratio of high watermark,
         // then send watermark flush trigger
-        if let Ok(_lock) = self.memory_spill_lock.try_lock() {
+        if let Some(_lock) = self.memory_spill_lock.try_lock() {
             let used_ratio = self.hot_store.memory_used_ratio().await;
             if used_ratio > self.config.memory_spill_high_watermark {
                 if let Err(e) = 
self.memory_watermark_flush_trigger_sender.send(()).await {
@@ -457,24 +453,21 @@ impl Store for HybridStore {
             .await
     }
 
-    async fn purge(&self, app_id: String) -> Result<()> {
-        self.hot_store.purge(app_id.clone()).await?;
-        info!("Removed data of app:[{}] in hot store", &app_id);
+    async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result<i64, 
WorkerError> {
+        self.hot_store.release_buffer(ctx).await
+    }
+
+    async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+        let app_id = &ctx.app_id;
+        self.hot_store.purge(ctx.clone()).await?;
+        info!("Removed data of app:[{}] in hot store", app_id);
         if self.warm_store.is_some() {
-            self.warm_store
-                .as_ref()
-                .unwrap()
-                .purge(app_id.clone())
-                .await?;
-            info!("Removed data of app:[{}] in warm store", &app_id);
+            self.warm_store.as_ref().unwrap().purge(ctx.clone()).await?;
+            info!("Removed data of app:[{}] in warm store", app_id);
         }
         if self.cold_store.is_some() {
-            self.cold_store
-                .as_ref()
-                .unwrap()
-                .purge(app_id.clone())
-                .await?;
-            info!("Removed data of app:[{}] in cold store", &app_id);
+            self.cold_store.as_ref().unwrap().purge(ctx.clone()).await?;
+            info!("Removed data of app:[{}] in cold store", app_id);
         }
         Ok(())
     }
@@ -511,7 +504,7 @@ pub async fn watermark_flush(store: Arc<HybridStore>) -> 
Result<()> {
 
     let mut flushed_size = 0u64;
     for (partition_id, buffer) in buffers {
-        let mut buffer_inner = buffer.lock().await;
+        let mut buffer_inner = buffer.lock();
         let (in_flight_uid, blocks) = 
buffer_inner.migrate_staging_to_in_flight()?;
         drop(buffer_inner);
         for block in &blocks {
diff --git a/rust/experimental/server/src/store/localfile.rs 
b/rust/experimental/server/src/store/localfile.rs
index 34bd9d97b..e527d8cdb 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -17,8 +17,8 @@
 
 use crate::app::ReadingOptions::FILE_OFFSET_AND_LEN;
 use crate::app::{
-    PartitionedUId, ReadingIndexViewContext, ReadingViewContext, 
RequireBufferContext,
-    WritingViewContext,
+    PartitionedUId, PurgeDataContext, ReadingIndexViewContext, 
ReadingViewContext,
+    ReleaseBufferContext, RequireBufferContext, WritingViewContext,
 };
 use crate::config::LocalfileStoreConfig;
 use crate::error::WorkerError;
@@ -112,6 +112,10 @@ impl LocalFileStore {
         format!("{}", app_id)
     }
 
+    fn gen_relative_path_for_shuffle(app_id: &str, shuffle_id: i32) -> String {
+        format!("{}/{}", app_id, shuffle_id)
+    }
+
     fn gen_relative_path_for_partition(uid: &PartitionedUId) -> (String, 
String) {
         (
             format!(
@@ -426,32 +430,44 @@ impl Store for LocalFileStore {
         todo!()
     }
 
-    async fn purge(&self, app_id: String) -> Result<()> {
-        let app_relative_dir_path = 
LocalFileStore::gen_relative_path_for_app(&app_id);
+    async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64, 
WorkerError> {
+        todo!()
+    }
 
-        let all_partition_ids = self.get_app_all_partitions(&app_id);
-        if all_partition_ids.is_empty() {
-            return Ok(());
-        }
+    async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+        let app_id = ctx.app_id;
+        let shuffle_id_option = ctx.shuffle_id;
+
+        let data_relative_dir_path = match shuffle_id_option {
+            Some(shuffle_id) => 
LocalFileStore::gen_relative_path_for_shuffle(&app_id, shuffle_id),
+            _ => LocalFileStore::gen_relative_path_for_app(&app_id),
+        };
 
         for local_disk_ref in &self.local_disks {
             let disk = local_disk_ref.clone();
-            disk.delete(app_relative_dir_path.to_string()).await?;
+            disk.delete(data_relative_dir_path.to_string()).await?;
         }
 
-        for (shuffle_id, partition_id) in all_partition_ids.into_iter() {
-            // delete lock
-            let uid = PartitionedUId {
-                app_id: app_id.clone(),
-                shuffle_id,
-                partition_id,
-            };
-            let (data_file_path, _) = 
LocalFileStore::gen_relative_path_for_partition(&uid);
-            self.partition_file_locks.remove(&data_file_path);
-        }
+        if shuffle_id_option.is_none() {
+            let all_partition_ids = self.get_app_all_partitions(&app_id);
+            if all_partition_ids.is_empty() {
+                return Ok(());
+            }
 
-        // delete disk mapping
-        self.delete_app(&app_id)?;
+            for (shuffle_id, partition_id) in all_partition_ids.into_iter() {
+                // delete lock
+                let uid = PartitionedUId {
+                    app_id: app_id.clone(),
+                    shuffle_id,
+                    partition_id,
+                };
+                let (data_file_path, _) = 
LocalFileStore::gen_relative_path_for_partition(&uid);
+                self.partition_file_locks.remove(&data_file_path);
+            }
+
+            // delete disk mapping
+            self.delete_app(&app_id)?;
+        }
 
         Ok(())
     }
@@ -739,8 +755,8 @@ impl LocalDisk {
 #[cfg(test)]
 mod test {
     use crate::app::{
-        PartitionedUId, ReadingIndexViewContext, ReadingOptions, 
ReadingViewContext,
-        WritingViewContext,
+        PartitionedUId, PurgeDataContext, ReadingIndexViewContext, 
ReadingOptions,
+        ReadingViewContext, WritingViewContext,
     };
     use crate::store::localfile::{LocalDisk, LocalDiskConfig, LocalFileStore};
 
@@ -805,11 +821,29 @@ mod test {
                 &temp_path, &app_id, "0", "0"
             )))?
         );
-        runtime.wait(local_store.purge(app_id.clone()))?;
+
+        // shuffle level purge
+        runtime
+            .wait(local_store.purge(PurgeDataContext::new(app_id.to_string(), 
Some(0))))
+            .expect("");
+        assert_eq!(
+            false,
+            runtime.wait(tokio::fs::try_exists(format!(
+                "{}/{}/{}",
+                &temp_path, &app_id, 0
+            )))?
+        );
+
+        // app level purge
+        runtime.wait(local_store.purge((&*app_id).into()))?;
         assert_eq!(
             false,
             runtime.wait(tokio::fs::try_exists(format!("{}/{}", &temp_path, 
&app_id)))?
         );
+        assert!(!local_store
+            .partition_file_locks
+            .contains_key(&format!("{}/{}/{}/{}.data", &temp_path, &app_id, 0, 
0)));
+        assert!(!local_store.partition_written_disk_map.contains_key(&app_id));
 
         Ok(())
     }
diff --git a/rust/experimental/server/src/store/mem/mod.rs 
b/rust/experimental/server/src/store/mem/mod.rs
index cae795981..84061067d 100644
--- a/rust/experimental/server/src/store/mem/mod.rs
+++ b/rust/experimental/server/src/store/mem/mod.rs
@@ -15,32 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub use await_tree::InstrumentAwait;
-
-pub struct MemoryBufferTicket {
-    id: i64,
-    created_time: u64,
-    size: i64,
-}
-
-impl MemoryBufferTicket {
-    pub fn new(id: i64, created_time: u64, size: i64) -> Self {
-        Self {
-            id,
-            created_time,
-            size,
-        }
-    }
+pub mod ticket;
 
-    pub fn get_size(&self) -> i64 {
-        self.size
-    }
-
-    pub fn is_timeout(&self, timeout_sec: i64) -> bool {
-        crate::util::current_timestamp_sec() - self.created_time > timeout_sec 
as u64
-    }
-
-    pub fn get_id(&self) -> i64 {
-        self.id
-    }
-}
+pub use await_tree::InstrumentAwait;
diff --git a/rust/experimental/server/src/store/mem/ticket.rs 
b/rust/experimental/server/src/store/mem/ticket.rs
new file mode 100644
index 000000000..eaad3796e
--- /dev/null
+++ b/rust/experimental/server/src/store/mem/ticket.rs
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::WorkerError;
+use crate::runtime::manager::RuntimeManager;
+use anyhow::Result;
+use dashmap::DashMap;
+use log::debug;
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+
+#[derive(Clone)]
+pub struct Ticket {
+    id: i64,
+    created_time: u64,
+    size: i64,
+    owned_by_app_id: String,
+}
+
+impl Ticket {
+    pub fn new(ticket_id: i64, created_time: u64, size: i64, app_id: &str) -> 
Self {
+        Self {
+            id: ticket_id,
+            created_time,
+            size,
+            owned_by_app_id: app_id.into(),
+        }
+    }
+
+    pub fn get_size(&self) -> i64 {
+        self.size
+    }
+
+    pub fn is_timeout(&self, timeout_sec: i64) -> bool {
+        crate::util::current_timestamp_sec() - self.created_time > timeout_sec 
as u64
+    }
+
+    pub fn get_id(&self) -> i64 {
+        self.id
+    }
+}
+
+#[derive(Clone)]
+pub struct TicketManager {
+    // key: ticket_id
+    ticket_store: Arc<DashMap<i64, Ticket>>,
+
+    ticket_timeout_sec: i64,
+    ticket_timeout_check_interval_sec: i64,
+}
+
+impl TicketManager {
+    pub fn new<F: FnMut(i64) -> bool + Send + 'static>(
+        ticket_timeout_sec: i64,
+        ticket_timeout_check_interval_sec: i64,
+        free_allocated_size_func: F,
+        runtime_manager: RuntimeManager,
+    ) -> Self {
+        let manager = Self {
+            ticket_store: Default::default(),
+            ticket_timeout_sec,
+            ticket_timeout_check_interval_sec,
+        };
+        Self::schedule_ticket_check(manager.clone(), free_allocated_size_func, 
runtime_manager);
+        manager
+    }
+
+    /// check the ticket existence
+    pub fn exist(&self, ticket_id: i64) -> bool {
+        self.ticket_store.contains_key(&ticket_id)
+    }
+
+    /// Delete one ticket by its id, and it will return the allocated size for 
this ticket
+    pub fn delete(&self, ticket_id: i64) -> Result<i64, WorkerError> {
+        if let Some(entry) = self.ticket_store.remove(&ticket_id) {
+            Ok(entry.1.size)
+        } else {
+            Err(WorkerError::TICKET_ID_NOT_EXIST(ticket_id))
+        }
+    }
+
+    /// Delete all the ticket owned by the app id. And
+    /// it will return all the allocated size of ticket ids that owned by this 
app_id
+    pub fn delete_by_app_id(&self, app_id: &str) -> i64 {
+        let read_view = self.ticket_store.clone();
+        let mut deleted_ids = vec![];
+        for ticket in read_view.iter() {
+            if ticket.owned_by_app_id == *app_id {
+                deleted_ids.push(ticket.id);
+            }
+        }
+
+        let mut size = 0i64;
+        for deleted_id in deleted_ids {
+            size += self
+                .ticket_store
+                .remove(&deleted_id)
+                .map_or(0, |val| val.1.size);
+        }
+        size
+    }
+
+    /// insert one ticket managed by this ticket manager
+    pub fn insert(&self, ticket_id: i64, size: i64, created_timestamp: u64, 
app_id: &str) -> bool {
+        let ticket = Ticket {
+            id: ticket_id,
+            created_time: created_timestamp,
+            size,
+            owned_by_app_id: app_id.into(),
+        };
+
+        self.ticket_store
+            .insert(ticket_id, ticket)
+            .map_or(false, |_| true)
+    }
+
+    fn schedule_ticket_check<F: FnMut(i64) -> bool + Send + 'static>(
+        ticket_manager: TicketManager,
+        mut free_allocated_fn: F,
+        _runtime_manager: RuntimeManager,
+    ) {
+        thread::spawn(move || {
+            let ticket_store = ticket_manager.ticket_store;
+            loop {
+                let read_view = ticket_store.clone();
+                let mut timeout_tickets = vec![];
+                for ticket in read_view.iter() {
+                    if ticket.is_timeout(ticket_manager.ticket_timeout_sec) {
+                        timeout_tickets.push(ticket.id);
+                    }
+                }
+
+                let mut total_removed_size = 0i64;
+                for timeout_ticket_id in timeout_tickets.iter() {
+                    total_removed_size += ticket_store
+                        .remove(timeout_ticket_id)
+                        .map_or(0, |val| val.1.size);
+                }
+                if total_removed_size != 0 {
+                    free_allocated_fn(total_removed_size);
+                    debug!("remove {:#?} memory allocated tickets, release 
pre-allocated memory size: {:?}", timeout_tickets, total_removed_size);
+                }
+                thread::sleep(Duration::from_secs(
+                    ticket_manager.ticket_timeout_check_interval_sec as u64,
+                ));
+            }
+        });
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use crate::runtime::manager::RuntimeManager;
+    use crate::store::mem::ticket::TicketManager;
+    use dashmap::DashMap;
+    use std::sync::{Arc, Mutex};
+    use std::thread;
+    use std::thread::JoinHandle;
+    use std::time::Duration;
+
+    #[test]
+    fn test_closure() {
+        let state = Arc::new(DashMap::new());
+        state.insert(1, 1);
+
+        fn schedule(mut callback: impl FnMut(i64) -> i64 + Send + 'static) -> 
JoinHandle<i64> {
+            thread::spawn(move || callback(2))
+        }
+
+        let state_cloned = state.clone();
+        let callback = move |a: i64| {
+            state_cloned.insert(a, a);
+            a + 1
+        };
+        schedule(callback).join().expect("");
+
+        assert!(state.contains_key(&2));
+    }
+
+    #[test]
+    fn test_ticket_manager() {
+        let released_size = Arc::new(Mutex::new(0));
+
+        let release_size_cloned = released_size.clone();
+        let free_allocated_size_func = move |size: i64| {
+            *(release_size_cloned.lock().unwrap()) += size;
+            true
+        };
+        let ticket_manager =
+            TicketManager::new(1, 1, free_allocated_size_func, 
RuntimeManager::default());
+        let app_id = "test_ticket_manager_app_id";
+
+        assert!(ticket_manager.delete(1000).is_err());
+
+        // case1
+        ticket_manager.insert(1, 10, crate::util::current_timestamp_sec() + 1, 
app_id);
+        ticket_manager.insert(2, 10, crate::util::current_timestamp_sec() + 1, 
app_id);
+        assert!(ticket_manager.exist(1));
+        assert!(ticket_manager.exist(2));
+
+        // case2
+        ticket_manager.delete(1).expect("");
+        assert!(!ticket_manager.exist(1));
+        assert!(ticket_manager.exist(2));
+
+        // case3
+        ticket_manager.delete_by_app_id(app_id);
+        assert!(!ticket_manager.exist(2));
+
+        // case4
+        ticket_manager.insert(3, 10, crate::util::current_timestamp_sec() + 1, 
app_id);
+        assert!(ticket_manager.exist(3));
+        awaitility::at_most(Duration::from_secs(5)).until(|| 
!ticket_manager.exist(3));
+        assert_eq!(10, *released_size.lock().unwrap());
+    }
+}
diff --git a/rust/experimental/server/src/store/memory.rs 
b/rust/experimental/server/src/store/memory.rs
index c7b0de94d..cfb6ff0e0 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -17,8 +17,8 @@
 
 use crate::app::ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE;
 use crate::app::{
-    PartitionedUId, ReadingIndexViewContext, ReadingViewContext, 
RequireBufferContext,
-    WritingViewContext,
+    PartitionedUId, PurgeDataContext, ReadingIndexViewContext, 
ReadingViewContext,
+    ReleaseBufferContext, RequireBufferContext, WritingViewContext,
 };
 use crate::config::MemoryStoreConfig;
 use crate::error::WorkerError;
@@ -39,27 +39,21 @@ use std::collections::{BTreeMap, HashMap};
 
 use std::str::FromStr;
 
-use crate::store::mem::InstrumentAwait;
-use crate::store::mem::MemoryBufferTicket;
+use crate::store::mem::ticket::TicketManager;
 use croaring::Treemap;
-use log::error;
+use spin::mutex::Mutex;
 use std::sync::atomic::{AtomicU64, Ordering};
 use std::sync::Arc;
-use std::time::Duration;
-use tokio::sync::Mutex;
-use tokio::time::sleep as delay_for;
 
 pub struct MemoryStore {
     // todo: change to RW lock
     state: DashMap<PartitionedUId, Arc<Mutex<StagingBuffer>>>,
     budget: MemoryBudget,
     // key: app_id, value: allocated memory size
-    memory_allocated_of_app: DashMap<String, DashMap<i64, MemoryBufferTicket>>,
     memory_capacity: i64,
-    buffer_ticket_timeout_sec: i64,
-    buffer_ticket_check_interval_sec: i64,
     in_flush_buffer_size: AtomicU64,
     runtime_manager: RuntimeManager,
+    ticket_manager: TicketManager,
 }
 
 unsafe impl Send for MemoryStore {}
@@ -68,45 +62,59 @@ unsafe impl Sync for MemoryStore {}
 impl MemoryStore {
     // only for test cases
     pub fn new(max_memory_size: i64) -> Self {
+        let budget = MemoryBudget::new(max_memory_size);
+        let runtime_manager: RuntimeManager = Default::default();
+
+        let budget_clone = budget.clone();
+        let free_allocated_size_func =
+            move |size: i64| budget_clone.free_allocated(size).map_or(false, 
|v| v);
+        let ticket_manager = TicketManager::new(
+            5 * 60,
+            10,
+            free_allocated_size_func,
+            runtime_manager.clone(),
+        );
         MemoryStore {
+            budget,
             state: DashMap::new(),
-            budget: MemoryBudget::new(max_memory_size),
-            memory_allocated_of_app: DashMap::new(),
             memory_capacity: max_memory_size,
-            buffer_ticket_timeout_sec: 5 * 60,
-            buffer_ticket_check_interval_sec: 10,
+            ticket_manager,
             in_flush_buffer_size: Default::default(),
-            runtime_manager: Default::default(),
+            runtime_manager,
         }
     }
 
     pub fn from(conf: MemoryStoreConfig, runtime_manager: RuntimeManager) -> 
Self {
         let capacity = ReadableSize::from_str(&conf.capacity).unwrap();
+        let budget = MemoryBudget::new(capacity.as_bytes() as i64);
+
+        let budget_clone = budget.clone();
+        let free_allocated_size_func =
+            move |size: i64| budget_clone.free_allocated(size).map_or(false, 
|v| v);
+        let ticket_manager = TicketManager::new(
+            5 * 60,
+            10,
+            free_allocated_size_func,
+            runtime_manager.clone(),
+        );
         MemoryStore {
             state: DashMap::new(),
             budget: MemoryBudget::new(capacity.as_bytes() as i64),
-            memory_allocated_of_app: DashMap::new(),
             memory_capacity: capacity.as_bytes() as i64,
-            buffer_ticket_timeout_sec: 
conf.buffer_ticket_timeout_sec.unwrap_or(5 * 60),
-            buffer_ticket_check_interval_sec: 10,
+            ticket_manager,
             in_flush_buffer_size: Default::default(),
             runtime_manager,
         }
     }
 
-    // only for tests
-    fn refresh_buffer_ticket_check_interval_sec(&mut self, interval: i64) {
-        self.buffer_ticket_check_interval_sec = interval
-    }
-
     // todo: make this used size as a var
     pub async fn memory_usage_ratio(&self) -> f32 {
-        let snapshot = self.budget.snapshot().await;
+        let snapshot = self.budget.snapshot();
         snapshot.get_used_percent()
     }
 
     pub async fn memory_snapshot(&self) -> Result<MemorySnapshot> {
-        Ok(self.budget.snapshot().await)
+        Ok(self.budget.snapshot())
     }
 
     pub fn get_capacity(&self) -> Result<i64> {
@@ -114,7 +122,7 @@ impl MemoryStore {
     }
 
     pub async fn memory_used_ratio(&self) -> f32 {
-        let snapshot = self.budget.snapshot().await;
+        let snapshot = self.budget.snapshot();
         (snapshot.used + snapshot.allocated
             - self.in_flush_buffer_size.load(Ordering::SeqCst) as i64) as f32
             / snapshot.capacity as f32
@@ -128,8 +136,12 @@ impl MemoryStore {
         self.in_flush_buffer_size.fetch_sub(size, Ordering::SeqCst);
     }
 
-    pub async fn free_memory(&self, size: i64) -> Result<bool> {
-        self.budget.free_used(size).await
+    pub async fn free_used(&self, size: i64) -> Result<bool> {
+        self.budget.free_used(size)
+    }
+
+    pub async fn free_allocated(&self, size: i64) -> Result<bool> {
+        self.budget.free_allocated(size)
     }
 
     pub async fn get_required_spill_buffer(
@@ -139,7 +151,7 @@ impl MemoryStore {
         // sort
         // get the spill buffers
 
-        let snapshot = self.budget.snapshot().await;
+        let snapshot = self.budget.snapshot();
         let removed_size = snapshot.used - target_len;
         if removed_size <= 0 {
             return HashMap::new();
@@ -149,7 +161,7 @@ impl MemoryStore {
 
         let buffers = self.state.clone().into_read_only();
         for buffer in buffers.iter() {
-            let staging_size = buffer.1.lock().await.staging_size;
+            let staging_size = buffer.1.lock().staging_size;
             let valset = sorted_tree_map
                 .entry(staging_size)
                 .or_insert_with(|| vec![]);
@@ -180,7 +192,7 @@ impl MemoryStore {
 
     pub async fn get_partitioned_buffer_size(&self, uid: &PartitionedUId) -> 
Result<u64> {
         let buffer = self.get_underlying_partition_buffer(uid);
-        let buffer = buffer.lock().await;
+        let buffer = buffer.lock();
         Ok(buffer.total_size as u64)
     }
 
@@ -194,7 +206,7 @@ impl MemoryStore {
         in_flight_blocks_id: i64,
     ) -> Result<()> {
         let buffer = self.get_or_create_underlying_staging_buffer(uid);
-        let mut buffer_ref = buffer.lock().await;
+        let mut buffer_ref = buffer.lock();
         buffer_ref.flight_finished(&in_flight_blocks_id)?;
         Ok(())
     }
@@ -234,108 +246,24 @@ impl MemoryStore {
 
         (fetched, fetched_size)
     }
-
-    pub(crate) fn is_ticket_exist(&self, app_id: &str, ticket_id: i64) -> bool 
{
-        self.memory_allocated_of_app
-            .get(app_id)
-            .map_or(false, |app_entry| app_entry.contains_key(&ticket_id))
-    }
-
-    /// return the discarded memory allocated size
-    pub(crate) fn discard_tickets(&self, app_id: &str, ticket_id_option: 
Option<i64>) -> i64 {
-        match ticket_id_option {
-            None => self
-                .memory_allocated_of_app
-                .remove(app_id)
-                .map_or(0, |app| app.1.iter().map(|x| x.get_size()).sum()),
-            Some(ticket_id) => 
self.memory_allocated_of_app.get(app_id).map_or(0, |app| {
-                app.remove(&ticket_id)
-                    .map_or(0, |ticket| ticket.1.get_size())
-            }),
-        }
-    }
-
-    fn cache_buffer_required_ticket(
-        &self,
-        app_id: &str,
-        require_buffer: &RequireBufferResponse,
-        size: i64,
-    ) {
-        let app_entry = self
-            .memory_allocated_of_app
-            .entry(app_id.to_string())
-            .or_insert_with(|| DashMap::default());
-        app_entry.insert(
-            require_buffer.ticket_id,
-            MemoryBufferTicket::new(
-                require_buffer.ticket_id,
-                require_buffer.allocated_timestamp,
-                size,
-            ),
-        );
-    }
-
-    async fn check_allocated_tickets(&self) {
-        // if the ticket is timeout, discard this.
-        let mut timeout_ids = vec![];
-        let iter = self.memory_allocated_of_app.iter();
-        for app in iter {
-            let app_id = &app.key().to_string();
-            let app_iter = app.iter();
-            for ticket in app_iter {
-                if ticket.is_timeout(self.buffer_ticket_timeout_sec) {
-                    timeout_ids.push((app_id.to_string(), 
ticket.key().clone()))
-                }
-            }
-        }
-        for (app_id, ticket_id) in timeout_ids {
-            info!(
-                "Releasing timeout ticket of id:{}, app_id:{}",
-                ticket_id, app_id
-            );
-            let released = self.discard_tickets(&app_id, Some(ticket_id));
-            if let Err(e) = self.budget.free_allocated(released).await {
-                error!(
-                    "Errors on removing the timeout ticket of id:{}, 
app_id:{}. error: {:?}",
-                    ticket_id, app_id, e
-                );
-            }
-        }
-    }
 }
 
 #[async_trait]
 impl Store for MemoryStore {
     fn start(self: Arc<Self>) {
-        // schedule check to find out the timeout allocated buffer ticket
-        let mem_store = self.clone();
-        self.runtime_manager.default_runtime.spawn(async move {
-            loop {
-                mem_store.check_allocated_tickets().await;
-                delay_for(Duration::from_secs(
-                    mem_store.buffer_ticket_check_interval_sec as u64,
-                ))
-                .await;
-            }
-        });
+        // ignore
     }
 
     async fn insert(&self, ctx: WritingViewContext) -> Result<(), WorkerError> 
{
         let uid = ctx.uid;
         let buffer = self.get_or_create_underlying_staging_buffer(uid.clone());
-        let mut buffer_guarded = buffer
-            .lock()
-            .instrument_await("trying buffer lock to insert")
-            .await;
+        let mut buffer_guarded = buffer.lock();
 
         let blocks = ctx.data_blocks;
         let inserted_size = buffer_guarded.add(blocks)?;
         drop(buffer_guarded);
 
-        self.budget
-            .allocated_to_used(inserted_size)
-            .instrument_await("make budget allocated -> used")
-            .await?;
+        self.budget.allocated_to_used(inserted_size)?;
 
         TOTAL_MEMORY_USED.inc_by(inserted_size as u64);
 
@@ -345,10 +273,7 @@ impl Store for MemoryStore {
     async fn get(&self, ctx: ReadingViewContext) -> Result<ResponseData, 
WorkerError> {
         let uid = ctx.uid;
         let buffer = self.get_or_create_underlying_staging_buffer(uid);
-        let buffer = buffer
-            .lock()
-            .instrument_await("getting partitioned buffer lock")
-            .await;
+        let buffer = buffer.lock();
 
         let options = ctx.reading_options;
         let (fetched_blocks, length) = match options {
@@ -462,34 +387,9 @@ impl Store for MemoryStore {
         panic!("It should not be invoked.")
     }
 
-    async fn require_buffer(
-        &self,
-        ctx: RequireBufferContext,
-    ) -> Result<RequireBufferResponse, WorkerError> {
-        let (succeed, ticket_id) = self.budget.pre_allocate(ctx.size).await?;
-        match succeed {
-            true => {
-                let require_buffer_resp = 
RequireBufferResponse::new(ticket_id);
-                self.cache_buffer_required_ticket(
-                    ctx.uid.app_id.as_str(),
-                    &require_buffer_resp,
-                    ctx.size,
-                );
-                Ok(require_buffer_resp)
-            }
-            _ => Err(WorkerError::NO_ENOUGH_MEMORY_TO_BE_ALLOCATED),
-        }
-    }
-
-    async fn purge(&self, app_id: String) -> Result<()> {
-        // free allocated
-        let released_size = self.discard_tickets(app_id.as_str(), None);
-        self.budget.free_allocated(released_size).await?;
-        info!(
-            "free allocated buffer size:[{}] for app:[{}]",
-            released_size,
-            app_id.as_str()
-        );
+    async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+        let app_id = ctx.app_id;
+        let shuffle_id_option = ctx.shuffle_id;
 
         // remove the corresponding app's data
         let read_only_state_view = self.state.clone().into_read_only();
@@ -497,24 +397,31 @@ impl Store for MemoryStore {
         for entry in read_only_state_view.iter() {
             let pid = entry.0;
             if pid.app_id == app_id {
-                _removed_list.push(pid);
+                if ctx.shuffle_id.is_some() {
+                    if pid.shuffle_id == shuffle_id_option.unwrap() {
+                        _removed_list.push(pid);
+                    } else {
+                        continue;
+                    }
+                } else {
+                    _removed_list.push(pid);
+                }
             }
         }
 
         let mut used = 0;
         for removed_pid in _removed_list {
             if let Some(entry) = self.state.remove(removed_pid) {
-                used += entry.1.lock().await.total_size;
+                used += entry.1.lock().total_size;
             }
         }
 
         // free used
-        self.budget.free_used(used).await?;
+        self.budget.free_used(used)?;
 
         info!(
-            "removed used buffer size:[{}] for app:[{}]",
-            used,
-            app_id.as_str()
+            "removed used buffer size:[{}] for [{:?}], [{:?}]",
+            used, app_id, shuffle_id_option
         );
 
         Ok(())
@@ -523,6 +430,31 @@ impl Store for MemoryStore {
     async fn is_healthy(&self) -> Result<bool> {
         Ok(true)
     }
+
+    async fn require_buffer(
+        &self,
+        ctx: RequireBufferContext,
+    ) -> Result<RequireBufferResponse, WorkerError> {
+        let (succeed, ticket_id) = self.budget.pre_allocate(ctx.size)?;
+        match succeed {
+            true => {
+                let require_buffer_resp = 
RequireBufferResponse::new(ticket_id);
+                self.ticket_manager.insert(
+                    ticket_id,
+                    ctx.size,
+                    require_buffer_resp.allocated_timestamp,
+                    &ctx.uid.app_id,
+                );
+                Ok(require_buffer_resp)
+            }
+            _ => Err(WorkerError::NO_ENOUGH_MEMORY_TO_BE_ALLOCATED),
+        }
+    }
+
+    async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result<i64, 
WorkerError> {
+        let ticket_id = ctx.ticket_id;
+        self.ticket_manager.delete(ticket_id)
+    }
 }
 
 /// thread safe, this will be guarded by the lock
@@ -662,12 +594,12 @@ impl MemoryBudget {
         }
     }
 
-    pub async fn snapshot(&self) -> MemorySnapshot {
+    pub fn snapshot(&self) -> MemorySnapshot {
         let inner = self.inner.lock().unwrap();
         (inner.capacity, inner.allocated, inner.used).into()
     }
 
-    async fn pre_allocate(&self, size: i64) -> Result<(bool, i64)> {
+    fn pre_allocate(&self, size: i64) -> Result<(bool, i64)> {
         let mut inner = self.inner.lock().unwrap();
         let free_space = inner.capacity - inner.allocated - inner.used;
         if free_space < size {
@@ -681,7 +613,7 @@ impl MemoryBudget {
         }
     }
 
-    async fn allocated_to_used(&self, size: i64) -> Result<bool> {
+    fn allocated_to_used(&self, size: i64) -> Result<bool> {
         let mut inner = self.inner.lock().unwrap();
         if inner.allocated < size {
             inner.allocated = 0;
@@ -694,7 +626,7 @@ impl MemoryBudget {
         Ok(true)
     }
 
-    async fn free_used(&self, size: i64) -> Result<bool> {
+    fn free_used(&self, size: i64) -> Result<bool> {
         let mut inner = self.inner.lock().unwrap();
         if inner.used < size {
             inner.used = 0;
@@ -706,7 +638,7 @@ impl MemoryBudget {
         Ok(true)
     }
 
-    async fn free_allocated(&self, size: i64) -> Result<bool> {
+    fn free_allocated(&self, size: i64) -> Result<bool> {
         let mut inner = self.inner.lock().unwrap();
         if inner.allocated < size {
             inner.allocated = 0;
@@ -721,7 +653,7 @@ impl MemoryBudget {
 #[cfg(test)]
 mod test {
     use crate::app::{
-        PartitionedUId, ReadingOptions, ReadingViewContext, 
RequireBufferContext,
+        PartitionedUId, PurgeDataContext, ReadingOptions, ReadingViewContext, 
RequireBufferContext,
         WritingViewContext,
     };
 
@@ -733,75 +665,10 @@ mod test {
     use bytes::BytesMut;
     use core::panic;
     use std::sync::Arc;
-    use std::thread;
-    use std::time::Duration;
 
-    use crate::config::MemoryStoreConfig;
-    use crate::runtime::manager::RuntimeManager;
     use anyhow::Result;
     use croaring::Treemap;
 
-    #[test]
-    fn test_ticket_timeout() -> Result<()> {
-        let cfg = MemoryStoreConfig::from("2M".to_string(), 1);
-        let runtime_manager: RuntimeManager = Default::default();
-        let mut store = MemoryStore::from(cfg, runtime_manager.clone());
-
-        store.refresh_buffer_ticket_check_interval_sec(1);
-
-        let store = Arc::new(store);
-        store.clone().start();
-
-        let app_id = "mocked-app-id";
-        let ctx = 
RequireBufferContext::new(PartitionedUId::from(app_id.to_string(), 1, 1), 1000);
-        let resp = runtime_manager.wait(store.require_buffer(ctx.clone()))?;
-        assert!(store.is_ticket_exist(app_id, resp.ticket_id));
-
-        let snapshot = runtime_manager.wait(store.budget.snapshot());
-        assert_eq!(snapshot.allocated, 1000);
-        assert_eq!(snapshot.used, 0);
-
-        thread::sleep(Duration::from_secs(5));
-
-        assert!(!store.is_ticket_exist(app_id, resp.ticket_id));
-
-        let snapshot = runtime_manager.wait(store.budget.snapshot());
-        assert_eq!(snapshot.allocated, 0);
-        assert_eq!(snapshot.used, 0);
-
-        Ok(())
-    }
-
-    #[test]
-    fn test_memory_buffer_ticket() -> Result<()> {
-        let store = MemoryStore::new(1024 * 1000);
-        let runtime = store.runtime_manager.clone();
-
-        let app_id = "mocked-app-id";
-        let ctx = 
RequireBufferContext::new(PartitionedUId::from(app_id.to_string(), 1, 1), 1000);
-        let resp = runtime.wait(store.require_buffer(ctx.clone()))?;
-        let ticket_id_1 = resp.ticket_id;
-
-        let resp = runtime.wait(store.require_buffer(ctx.clone()))?;
-        let ticket_id_2 = resp.ticket_id;
-
-        assert!(store.is_ticket_exist(app_id, ticket_id_1));
-        assert!(store.is_ticket_exist(app_id, ticket_id_2));
-        assert!(!store.is_ticket_exist(app_id, 100239));
-
-        let snapshot = runtime.wait(store.budget.snapshot());
-        assert_eq!(snapshot.allocated, 1000 * 2);
-        assert_eq!(snapshot.used, 0);
-
-        runtime.wait(store.purge(app_id.to_string()))?;
-
-        let snapshot = runtime.wait(store.budget.snapshot());
-        assert_eq!(snapshot.allocated, 0);
-        assert_eq!(snapshot.used, 0);
-
-        Ok(())
-    }
-
     #[test]
     fn test_read_buffer_in_flight() {
         let store = MemoryStore::new(1024);
@@ -894,7 +761,7 @@ mod test {
 
         // case4: some data are in inflight blocks
         let buffer = 
store.get_or_create_underlying_staging_buffer(uid.clone());
-        let mut buffer = runtime.wait(buffer.lock());
+        let mut buffer = buffer.lock();
         let owned = buffer.staging.to_owned();
         buffer.staging.clear();
         let mut idx = 0;
@@ -932,7 +799,7 @@ mod test {
         // case5: old data in in_flight and latest data in staging.
         // read it from the block id 9, and read size of 30
         let buffer = 
store.get_or_create_underlying_staging_buffer(uid.clone());
-        let mut buffer = runtime.wait(buffer.lock());
+        let mut buffer = buffer.lock();
         buffer.staging.push(PartitionedDataBlock {
             block_id: 20,
             length: 10,
@@ -1033,15 +900,12 @@ mod test {
         };
         match runtime.default_runtime.block_on(store.require_buffer(ctx)) {
             Ok(_) => {
-                let _ = runtime
-                    .default_runtime
-                    .block_on(store.purge("100".to_string()));
+                let _ = 
runtime.default_runtime.block_on(store.purge("100".into()));
             }
             _ => panic!(),
         }
 
         let budget = store.budget.inner.lock().unwrap();
-        assert_eq!(0, budget.allocated);
         assert_eq!(0, budget.used);
         assert_eq!(1024 * 1024 * 1024, budget.capacity);
     }
@@ -1094,16 +958,24 @@ mod test {
             "Failed to obtain weak reference before purge"
         );
 
+        // partial purge for app's one shuffle data
+        runtime
+            .wait(store.purge(PurgeDataContext::new(app_id.to_string(), 
Some(shuffle_id))))
+            .expect("");
+        assert!(!store.state.contains_key(&PartitionedUId::from(
+            app_id.to_string(),
+            shuffle_id,
+            partition
+        )));
+
         // purge
-        runtime.wait(store.purge(app_id.to_string())).expect("");
+        runtime.wait(store.purge(app_id.into())).expect("");
         assert!(
             weak_ref_before.clone().unwrap().upgrade().is_none(),
             "Arc should not exist after purge"
         );
-        let snapshot = runtime.wait(store.budget.snapshot());
+        let snapshot = store.budget.snapshot();
         assert_eq!(snapshot.used, 0);
-        // the remaining allocated will be removed.
-        assert_eq!(snapshot.allocated, 0);
         assert_eq!(snapshot.capacity, 1024);
         let data = runtime.wait(store.get(reading_ctx.clone())).expect("");
         assert_eq!(0, data.from_memory().shuffle_data_block_segments.len());
diff --git a/rust/experimental/server/src/store/mod.rs 
b/rust/experimental/server/src/store/mod.rs
index 9e23545b5..f29c7bf56 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -23,7 +23,8 @@ pub mod mem;
 pub mod memory;
 
 use crate::app::{
-    ReadingIndexViewContext, ReadingViewContext, RequireBufferContext, 
WritingViewContext,
+    PurgeDataContext, ReadingIndexViewContext, ReadingViewContext, 
ReleaseBufferContext,
+    RequireBufferContext, WritingViewContext,
 };
 use crate::config::Config;
 use crate::error::WorkerError;
@@ -166,12 +167,14 @@ pub trait Store {
         &self,
         ctx: ReadingIndexViewContext,
     ) -> Result<ResponseDataIndex, WorkerError>;
+    async fn purge(&self, ctx: PurgeDataContext) -> Result<()>;
+    async fn is_healthy(&self) -> Result<bool>;
+
     async fn require_buffer(
         &self,
         ctx: RequireBufferContext,
     ) -> Result<RequireBufferResponse, WorkerError>;
-    async fn purge(&self, app_id: String) -> Result<()>;
-    async fn is_healthy(&self) -> Result<bool>;
+    async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result<i64, 
WorkerError>;
 }
 
 pub trait Persistent {}
diff --git a/rust/experimental/server/tests/write_read.rs 
b/rust/experimental/server/tests/write_read.rs
index 499005a2d..09f29a682 100644
--- a/rust/experimental/server/tests/write_read.rs
+++ b/rust/experimental/server/tests/write_read.rs
@@ -27,6 +27,7 @@ mod tests {
 
     use std::time::Duration;
     use tonic::transport::Channel;
+    use uniffle_worker::metric::GAUGE_MEMORY_ALLOCATED;
 
     fn create_mocked_config(grpc_port: i32, capacity: String, local_data_path: 
String) -> Config {
         Config {
@@ -82,6 +83,9 @@ mod tests {
 
         let client = ShuffleServerClient::connect(format!("http://{}:{}";, 
"0.0.0.0", port)).await?;
 
+        // after one batch write/read process, the allocated memory size 
should be 0
+        assert_eq!(0, GAUGE_MEMORY_ALLOCATED.get());
+
         write_read_for_one_time(client).await
     }
 }

Reply via email to