This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d22e5f0c [#1407] feat(rust): support more metrics about disk and topN 
data size (#1488)
7d22e5f0c is described below

commit 7d22e5f0cedd17f2b14f8d757ec25c5bc287fb39
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jan 29 13:41:58 2024 +0800

    [#1407] feat(rust): support more metrics about disk and topN data size 
(#1488)
    
    ### What changes were proposed in this pull request?
    
    1. feat(rust): add more metrics about local disk and reading
    2. feat(rust): support topN app data size
    
    ### Why are the changes needed?
    
    Fix: #1407
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
---
 rust/experimental/server/src/app.rs              | 94 ++++++++++++++++++++++--
 rust/experimental/server/src/grpc.rs             |  5 ++
 rust/experimental/server/src/metric.rs           | 83 +++++++++++++++++++++
 rust/experimental/server/src/store/hdfs.rs       |  9 ++-
 rust/experimental/server/src/store/hybrid.rs     | 20 +++--
 rust/experimental/server/src/store/local/disk.rs | 59 ++++++++++++---
 rust/experimental/server/src/store/localfile.rs  | 11 ++-
 rust/experimental/server/src/store/memory.rs     |  4 +-
 rust/experimental/server/src/store/mod.rs        |  2 +-
 9 files changed, 252 insertions(+), 35 deletions(-)

diff --git a/rust/experimental/server/src/app.rs 
b/rust/experimental/server/src/app.rs
index 35699f780..7d5328cc9 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -18,8 +18,9 @@
 use crate::config::Config;
 use crate::error::WorkerError;
 use crate::metric::{
-    GAUGE_APP_NUMBER, TOTAL_APP_NUMBER, 
TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED,
-    TOTAL_READ_DATA, TOTAL_RECEIVED_DATA, TOTAL_REQUIRE_BUFFER_FAILED,
+    GAUGE_APP_NUMBER, GAUGE_TOPN_APP_RESIDENT_DATA_SIZE, TOTAL_APP_NUMBER,
+    TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED, TOTAL_READ_DATA, 
TOTAL_READ_DATA_FROM_LOCALFILE,
+    TOTAL_READ_DATA_FROM_MEMORY, TOTAL_RECEIVED_DATA, 
TOTAL_REQUIRE_BUFFER_FAILED,
 };
 
 use crate::readable_size::ReadableSize;
@@ -47,6 +48,7 @@ use std::hash::{Hash, Hasher};
 use std::str::FromStr;
 
 use crate::proto::uniffle::RemoteStorage;
+use std::sync::atomic::Ordering::SeqCst;
 use std::sync::atomic::{AtomicU64, Ordering};
 use std::sync::RwLock;
 use std::sync::{Arc, OnceLock};
@@ -130,6 +132,9 @@ pub struct App {
     bitmap_of_blocks: DashMap<(i32, i32), PartitionedMeta>,
     huge_partition_marked_threshold: Option<u64>,
     huge_partition_memory_max_available_size: Option<u64>,
+
+    total_received_data_size: AtomicU64,
+    total_resident_data_size: AtomicU64,
 }
 
 #[derive(Clone)]
@@ -220,6 +225,8 @@ impl App {
             bitmap_of_blocks: DashMap::new(),
             huge_partition_marked_threshold,
             huge_partition_memory_max_available_size,
+            total_received_data_size: Default::default(),
+            total_resident_data_size: Default::default(),
         }
     }
 
@@ -245,6 +252,8 @@ impl App {
         self.get_underlying_partition_bitmap(ctx.uid.clone())
             .incr_data_size(len)?;
         TOTAL_RECEIVED_DATA.inc_by(len as u64);
+        self.total_received_data_size.fetch_add(len as u64, SeqCst);
+        self.total_resident_data_size.fetch_add(len as u64, SeqCst);
 
         let ctx = match self.is_huge_partition(&ctx.uid).await {
             Ok(true) => WritingViewContext::new(ctx.uid.clone(), 
ctx.data_blocks, true),
@@ -258,11 +267,18 @@ impl App {
     pub async fn select(&self, ctx: ReadingViewContext) -> 
Result<ResponseData, WorkerError> {
         let response = self.store.get(ctx).await;
         response.map(|data| {
-            let length = match &data {
-                ResponseData::Local(local_data) => local_data.data.len() as 
u64,
-                ResponseData::Mem(mem_data) => mem_data.data.len() as u64,
+            match &data {
+                ResponseData::Local(local_data) => {
+                    let length = local_data.data.len() as u64;
+                    TOTAL_READ_DATA_FROM_LOCALFILE.inc_by(length);
+                    TOTAL_READ_DATA.inc_by(length);
+                }
+                ResponseData::Mem(mem_data) => {
+                    let length = mem_data.data.len() as u64;
+                    TOTAL_READ_DATA_FROM_MEMORY.inc_by(length);
+                    TOTAL_READ_DATA.inc_by(length);
+                }
             };
-            TOTAL_READ_DATA.inc_by(length);
 
             data
         })
@@ -368,9 +384,21 @@ impl App {
     }
 
     pub async fn purge(&self, app_id: String, shuffle_id: Option<i32>) -> 
Result<()> {
-        self.store
+        let removed_size = self
+            .store
             .purge(PurgeDataContext::new(app_id, shuffle_id))
-            .await
+            .await?;
+        self.total_resident_data_size
+            .fetch_sub(removed_size as u64, SeqCst);
+        Ok(())
+    }
+
+    pub fn total_received_data_size(&self) -> u64 {
+        self.total_received_data_size.load(SeqCst)
+    }
+
+    pub fn total_resident_data_size(&self) -> u64 {
+        self.total_resident_data_size.load(SeqCst)
     }
 }
 
@@ -563,6 +591,31 @@ impl AppManager {
             }
         });
 
+        // calculate topN app shuffle data size
+        let app_manager_ref = app_ref.clone();
+        runtime_manager.default_runtime.spawn(async move {
+            info!("Starting calculating topN app shuffle data size...");
+            loop {
+                tokio::time::sleep(Duration::from_secs(10)).await;
+
+                let view = app_manager_ref.apps.clone().into_read_only();
+                let mut apps: Vec<_> = view.values().collect();
+                apps.sort_by_key(|x| 0 - x.total_resident_data_size());
+
+                let top_n = 10;
+                let limit = if apps.len() > top_n {
+                    top_n
+                } else {
+                    apps.len()
+                };
+                for idx in 0..limit {
+                    GAUGE_TOPN_APP_RESIDENT_DATA_SIZE
+                        .with_label_values(&[&apps[idx].app_id])
+                        .set(apps[idx].total_resident_data_size() as i64);
+                }
+            }
+        });
+
         let app_manager_cloned = app_ref.clone();
         runtime_manager.default_runtime.spawn(async move {
             info!("Starting purge event handler...");
@@ -719,6 +772,7 @@ mod test {
     use crate::store::{PartitionedDataBlock, ResponseData};
     use croaring::treemap::JvmSerializer;
     use croaring::Treemap;
+    use dashmap::DashMap;
 
     #[test]
     fn test_uid_hash() {
@@ -802,12 +856,20 @@ mod test {
                 _ => todo!(),
             }
 
+            // check the data size
+            assert_eq!(30, app.total_received_data_size());
+            assert_eq!(30, app.total_resident_data_size());
+
             // case3: purge
             runtime_manager
                 .wait(app_manager_ref.purge_app_data(app_id.to_string(), None))
                 .expect("");
 
             assert_eq!(false, app_manager_ref.get_app(app_id).is_none());
+
+            // check the data size again after the data has been removed
+            assert_eq!(30, app.total_received_data_size());
+            assert_eq!(0, app.total_resident_data_size());
         }
     }
 
@@ -857,4 +919,20 @@ mod test {
         let deserialized = Treemap::deserialize(&data).unwrap();
         assert_eq!(deserialized, Treemap::from_iter(vec![123, 124]));
     }
+
+    #[test]
+    fn test_dashmap_values() {
+        let dashmap = DashMap::new();
+        dashmap.insert(1, 3);
+        dashmap.insert(2, 2);
+        dashmap.insert(3, 8);
+
+        let cloned = dashmap.clone().into_read_only();
+        let mut vals: Vec<_> = cloned.values().collect();
+        vals.sort_by_key(|x| -(*x));
+        assert_eq!(vec![&8, &3, &2], vals);
+
+        let apps = vec![0, 1, 2, 3];
+        println!("{:#?}", &apps[0..2]);
+    }
 }
diff --git a/rust/experimental/server/src/grpc.rs 
b/rust/experimental/server/src/grpc.rs
index e04447335..ad30e003f 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -698,6 +698,7 @@ impl ShuffleServer for DefaultShuffleServer {
 }
 
 pub mod metrics_middleware {
+    use crate::metric::GAUGE_GRPC_REQUEST_QUEUE_SIZE;
     use hyper::service::Service;
     use hyper::Body;
     use prometheus::HistogramVec;
@@ -746,6 +747,8 @@ pub mod metrics_middleware {
         }
 
         fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
+            GAUGE_GRPC_REQUEST_QUEUE_SIZE.inc();
+
             // This is necessary because tonic internally uses 
`tower::buffer::Buffer`.
             // See 
https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
             // for details on why this is necessary
@@ -762,6 +765,8 @@ pub mod metrics_middleware {
 
                 timer.observe_duration();
 
+                GAUGE_GRPC_REQUEST_QUEUE_SIZE.inc();
+
                 Ok(response)
             })
         }
diff --git a/rust/experimental/server/src/metric.rs 
b/rust/experimental/server/src/metric.rs
index 7c1fe2792..9347f3857 100644
--- a/rust/experimental/server/src/metric.rs
+++ b/rust/experimental/server/src/metric.rs
@@ -39,6 +39,19 @@ pub static TOTAL_READ_DATA: Lazy<IntCounter> = Lazy::new(|| {
     IntCounter::new("total_read_data", "Reading Data").expect("metric should 
be created")
 });
 
+pub static TOTAL_READ_DATA_FROM_MEMORY: Lazy<IntCounter> = Lazy::new(|| {
+    IntCounter::new("total_read_data_from_memory", "Reading Data from memory")
+        .expect("metric should be created")
+});
+
+pub static TOTAL_READ_DATA_FROM_LOCALFILE: Lazy<IntCounter> = Lazy::new(|| {
+    IntCounter::new(
+        "total_read_data_from_localfile",
+        "Reading Data from localfile",
+    )
+    .expect("metric should be created")
+});
+
 pub static GRPC_GET_MEMORY_DATA_TRANSPORT_TIME: Lazy<Histogram> = Lazy::new(|| 
{
     let opts = HistogramOpts::new("grpc_get_memory_data_transport_time", 
"none")
         .buckets(Vec::from(DEFAULT_BUCKETS as &'static [f64]));
@@ -175,6 +188,33 @@ pub static TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED: 
Lazy<IntCounter> = Lazy::
     .expect("metrics should be created")
 });
 
+pub static GAUGE_LOCAL_DISK_CAPACITY: Lazy<IntGaugeVec> = Lazy::new(|| {
+    register_int_gauge_vec!(
+        "local_disk_capacity",
+        "local disk capacity for root path",
+        &["root"]
+    )
+    .unwrap()
+});
+
+pub static GAUGE_LOCAL_DISK_USED: Lazy<IntGaugeVec> = Lazy::new(|| {
+    register_int_gauge_vec!(
+        "local_disk_used",
+        "local disk used for root path",
+        &["root"]
+    )
+    .unwrap()
+});
+
+pub static GAUGE_LOCAL_DISK_IS_HEALTHY: Lazy<IntGaugeVec> = Lazy::new(|| {
+    register_int_gauge_vec!(
+        "local_disk_is_healthy",
+        "local disk is_healthy for root path",
+        &["root"]
+    )
+    .unwrap()
+});
+
 pub static GAUGE_RUNTIME_ALIVE_THREAD_NUM: Lazy<IntGaugeVec> = Lazy::new(|| {
     register_int_gauge_vec!(
         "runtime_thread_alive_gauge",
@@ -193,7 +233,50 @@ pub static GAUGE_RUNTIME_IDLE_THREAD_NUM: 
Lazy<IntGaugeVec> = Lazy::new(|| {
     .unwrap()
 });
 
+pub static GAUGE_TOPN_APP_RESIDENT_DATA_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| 
{
+    register_int_gauge_vec!(
+        "topN_app_resident_data_size",
+        "topN app resident data size",
+        &["app_id"]
+    )
+    .unwrap()
+});
+
+pub static GAUGE_IN_SPILL_DATA_SIZE: Lazy<IntGauge> =
+    Lazy::new(|| IntGauge::new("in_spill_data_size", "total data size in 
spill").unwrap());
+
+pub static GAUGE_GRPC_REQUEST_QUEUE_SIZE: Lazy<IntGauge> =
+    Lazy::new(|| IntGauge::new("grpc_request_queue_size", "grpc request queue 
size").unwrap());
+
 fn register_custom_metrics() {
+    REGISTRY
+        .register(Box::new(GAUGE_TOPN_APP_RESIDENT_DATA_SIZE.clone()))
+        .expect("");
+
+    REGISTRY
+        .register(Box::new(TOTAL_READ_DATA_FROM_LOCALFILE.clone()))
+        .expect("total_read_data must be registered");
+
+    REGISTRY
+        .register(Box::new(TOTAL_READ_DATA_FROM_MEMORY.clone()))
+        .expect("total_read_data must be registered");
+
+    REGISTRY
+        .register(Box::new(GAUGE_IN_SPILL_DATA_SIZE.clone()))
+        .expect("");
+
+    REGISTRY
+        .register(Box::new(GAUGE_LOCAL_DISK_CAPACITY.clone()))
+        .expect("");
+
+    REGISTRY
+        .register(Box::new(GAUGE_LOCAL_DISK_USED.clone()))
+        .expect("");
+
+    REGISTRY
+        .register(Box::new(GAUGE_LOCAL_DISK_IS_HEALTHY.clone()))
+        .expect("");
+
     REGISTRY
         .register(Box::new(GAUGE_RUNTIME_ALIVE_THREAD_NUM.clone()))
         .expect("");
diff --git a/rust/experimental/server/src/store/hdfs.rs 
b/rust/experimental/server/src/store/hdfs.rs
index 3a1b74c80..ce503d0e7 100644
--- a/rust/experimental/server/src/store/hdfs.rs
+++ b/rust/experimental/server/src/store/hdfs.rs
@@ -242,7 +242,7 @@ impl Store for HdfsStore {
         todo!()
     }
 
-    async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+    async fn purge(&self, ctx: PurgeDataContext) -> Result<i64> {
         let app_id = ctx.app_id;
         let filesystem = self.app_remote_clients.get(&app_id).ok_or(
             WorkerError::HDFS_NATIVE_CLIENT_NOT_FOUND(app_id.to_string()),
@@ -260,9 +260,12 @@ impl Store for HdfsStore {
             .map(|entry| entry.key().to_string())
             .collect();
 
+        let mut removed_size = 0i64;
         for deleted_key in keys_to_delete {
             self.partition_file_locks.remove(&deleted_key);
-            self.partition_cached_meta.remove(&deleted_key);
+            if let Some(meta) = 
self.partition_cached_meta.remove(&deleted_key) {
+                removed_size += meta.1.data_len;
+            }
         }
 
         info!("The hdfs data for {} has been deleted", &dir);
@@ -273,7 +276,7 @@ impl Store for HdfsStore {
             self.app_remote_clients.remove(&app_id);
         }
 
-        Ok(())
+        Ok(removed_size)
     }
 
     async fn is_healthy(&self) -> Result<bool> {
diff --git a/rust/experimental/server/src/store/hybrid.rs 
b/rust/experimental/server/src/store/hybrid.rs
index cb29357fe..ce996841a 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -24,8 +24,9 @@ use crate::await_tree::AWAIT_TREE_REGISTRY;
 use crate::config::{Config, HybridStoreConfig, StorageType};
 use crate::error::WorkerError;
 use crate::metric::{
-    GAUGE_MEMORY_SPILL_OPERATION, GAUGE_MEMORY_SPILL_TO_HDFS, 
GAUGE_MEMORY_SPILL_TO_LOCALFILE,
-    TOTAL_MEMORY_SPILL_OPERATION, TOTAL_MEMORY_SPILL_OPERATION_FAILED, 
TOTAL_MEMORY_SPILL_TO_HDFS,
+    GAUGE_IN_SPILL_DATA_SIZE, GAUGE_MEMORY_SPILL_OPERATION, 
GAUGE_MEMORY_SPILL_TO_HDFS,
+    GAUGE_MEMORY_SPILL_TO_LOCALFILE, TOTAL_MEMORY_SPILL_OPERATION,
+    TOTAL_MEMORY_SPILL_OPERATION_FAILED, TOTAL_MEMORY_SPILL_TO_HDFS,
     TOTAL_MEMORY_SPILL_TO_LOCALFILE,
 };
 use crate::readable_size::ReadableSize;
@@ -379,6 +380,8 @@ impl Store for HybridStore {
                         for block in &message.ctx.data_blocks {
                             size += block.length as u64;
                         }
+
+                        GAUGE_IN_SPILL_DATA_SIZE.add(size as i64);
                         match store_cloned
                             .memory_spill_to_persistent_store(message.clone())
                             
.instrument_await("memory_spill_to_persistent_store.")
@@ -401,6 +404,7 @@ impl Store for HybridStore {
                             }
                         }
                         store_cloned.memory_spill_event_num.dec_by(1);
+                        GAUGE_IN_SPILL_DATA_SIZE.sub(size as i64);
                         GAUGE_MEMORY_SPILL_OPERATION.dec();
                         drop(concurrency_guarder);
                     }));
@@ -483,19 +487,21 @@ impl Store for HybridStore {
         self.hot_store.release_buffer(ctx).await
     }
 
-    async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+    async fn purge(&self, ctx: PurgeDataContext) -> Result<i64> {
         let app_id = &ctx.app_id;
-        self.hot_store.purge(ctx.clone()).await?;
+        let mut removed_size = 0i64;
+
+        removed_size += self.hot_store.purge(ctx.clone()).await?;
         info!("Removed data of app:[{}] in hot store", app_id);
         if self.warm_store.is_some() {
-            self.warm_store.as_ref().unwrap().purge(ctx.clone()).await?;
+            removed_size += 
self.warm_store.as_ref().unwrap().purge(ctx.clone()).await?;
             info!("Removed data of app:[{}] in warm store", app_id);
         }
         if self.cold_store.is_some() {
-            self.cold_store.as_ref().unwrap().purge(ctx.clone()).await?;
+            removed_size += 
self.cold_store.as_ref().unwrap().purge(ctx.clone()).await?;
             info!("Removed data of app:[{}] in cold store", app_id);
         }
-        Ok(())
+        Ok(removed_size)
     }
 
     async fn is_healthy(&self) -> Result<bool> {
diff --git a/rust/experimental/server/src/store/local/disk.rs 
b/rust/experimental/server/src/store/local/disk.rs
index b8cd2c14a..7302903db 100644
--- a/rust/experimental/server/src/store/local/disk.rs
+++ b/rust/experimental/server/src/store/local/disk.rs
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::metric::{
+    GAUGE_LOCAL_DISK_CAPACITY, GAUGE_LOCAL_DISK_IS_HEALTHY, 
GAUGE_LOCAL_DISK_USED,
+};
 use crate::runtime::manager::RuntimeManager;
 use anyhow::{anyhow, Result};
 use await_tree::InstrumentAwait;
@@ -63,6 +66,8 @@ pub struct LocalDisk {
     is_corrupted: AtomicBool,
     is_healthy: AtomicBool,
     config: LocalDiskConfig,
+
+    capacity: u64,
 }
 
 impl LocalDisk {
@@ -75,13 +80,17 @@ impl LocalDisk {
         builder.root(&root);
         let operator: Operator = Operator::new(builder).unwrap().finish();
 
+        let disk_capacity =
+            Self::get_disk_capacity(&root).expect("Errors on getting disk 
capacity");
+
         let instance = LocalDisk {
-            root,
+            root: root.to_string(),
             operator,
             concurrency_limiter: Semaphore::new(config.max_concurrency as 
usize),
             is_corrupted: AtomicBool::new(false),
             is_healthy: AtomicBool::new(true),
             config,
+            capacity: disk_capacity,
         };
         let instance = Arc::new(instance);
 
@@ -92,6 +101,10 @@ impl LocalDisk {
             LocalDisk::loop_check_disk(cloned).await;
         });
 
+        GAUGE_LOCAL_DISK_CAPACITY
+            .with_label_values(&[&root])
+            .set(disk_capacity as i64);
+
         instance
     }
 
@@ -124,31 +137,45 @@ impl LocalDisk {
                 return;
             }
 
+            let root_ref = &local_disk.root;
+
             let check_succeed: Result<()> = 
LocalDisk::write_read_check(local_disk.clone()).await;
             if check_succeed.is_err() {
                 local_disk.mark_corrupted();
+                GAUGE_LOCAL_DISK_IS_HEALTHY
+                    .with_label_values(&[root_ref])
+                    .set(1i64);
                 error!(
                     "Errors on checking local disk corruption. err: {:#?}",
                     check_succeed.err()
                 );
             }
 
-            // check the capacity
-            let used_ratio = local_disk.get_disk_used_ratio();
-            if used_ratio.is_err() {
+            // get the disk used ratio.
+            let disk_capacity = local_disk.capacity;
+            let disk_available = Self::get_disk_available(root_ref);
+            if disk_available.is_err() {
                 error!(
-                    "Errors on getting the used ratio of the disk capacity. 
err: {:?}",
-                    used_ratio.err()
+                    "Errors on getting the available of the local disk. err: 
{:?}",
+                    disk_available.err()
                 );
                 continue;
             }
+            let disk_available = disk_available.unwrap();
+            let used_ratio = 1.0 - (disk_available as f64 / disk_capacity as 
f64);
+
+            GAUGE_LOCAL_DISK_USED
+                .with_label_values(&[root_ref])
+                .set((disk_capacity - disk_available) as i64);
 
-            let used_ratio = used_ratio.unwrap();
             if local_disk.is_healthy().unwrap()
                 && used_ratio > local_disk.config.high_watermark as f64
             {
                 warn!("Disk={} has been unhealthy.", &local_disk.root);
                 local_disk.mark_unhealthy();
+                GAUGE_LOCAL_DISK_IS_HEALTHY
+                    .with_label_values(&[root_ref])
+                    .set(1i64);
                 continue;
             }
 
@@ -157,6 +184,9 @@ impl LocalDisk {
             {
                 warn!("Disk={} has been healthy.", &local_disk.root);
                 local_disk.mark_healthy();
+                GAUGE_LOCAL_DISK_IS_HEALTHY
+                    .with_label_values(&[root_ref])
+                    .set(0i64);
                 continue;
             }
         }
@@ -248,11 +278,18 @@ impl LocalDisk {
         Ok(self.is_healthy.load(Ordering::SeqCst))
     }
 
-    fn get_disk_used_ratio(&self) -> Result<f64> {
+    fn get_disk_used_ratio(root: &str, capacity: u64) -> Result<f64> {
         // Get the total and available space in bytes
-        let available_space = fs2::available_space(&self.root)?;
-        let total_space = fs2::total_space(&self.root)?;
-        Ok(1.0 - (available_space as f64 / total_space as f64))
+        let available_space = fs2::available_space(root)?;
+        Ok(1.0 - (available_space as f64 / capacity as f64))
+    }
+
+    fn get_disk_capacity(root: &str) -> Result<u64> {
+        Ok(fs2::total_space(root)?)
+    }
+
+    fn get_disk_available(root: &str) -> Result<u64> {
+        Ok(fs2::available_space(root)?)
     }
 }
 
diff --git a/rust/experimental/server/src/store/localfile.rs 
b/rust/experimental/server/src/store/localfile.rs
index bb810dc78..782311fcf 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -364,7 +364,7 @@ impl Store for LocalFileStore {
         }))
     }
 
-    async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+    async fn purge(&self, ctx: PurgeDataContext) -> Result<i64> {
         let app_id = ctx.app_id;
         let shuffle_id_option = ctx.shuffle_id;
 
@@ -385,11 +385,16 @@ impl Store for LocalFileStore {
             .map(|entry| entry.key().to_string())
             .collect();
 
+        let mut removed_data_size = 0i64;
         for key in keys_to_delete {
-            self.partition_locks.remove(&key);
+            let meta = self.partition_locks.remove(&key);
+            if let Some(x) = meta {
+                let size = x.1.pointer.load(Ordering::SeqCst);
+                removed_data_size += size;
+            }
         }
 
-        Ok(())
+        Ok(removed_data_size)
     }
 
     async fn is_healthy(&self) -> Result<bool> {
diff --git a/rust/experimental/server/src/store/memory.rs 
b/rust/experimental/server/src/store/memory.rs
index b1be8e5ae..a77ae0397 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -387,7 +387,7 @@ impl Store for MemoryStore {
         panic!("It should not be invoked.")
     }
 
-    async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+    async fn purge(&self, ctx: PurgeDataContext) -> Result<i64> {
         let app_id = ctx.app_id;
         let shuffle_id_option = ctx.shuffle_id;
 
@@ -424,7 +424,7 @@ impl Store for MemoryStore {
             used, app_id, shuffle_id_option
         );
 
-        Ok(())
+        Ok(used)
     }
 
     async fn is_healthy(&self) -> Result<bool> {
diff --git a/rust/experimental/server/src/store/mod.rs 
b/rust/experimental/server/src/store/mod.rs
index 12a509d9b..29d15cd18 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -168,7 +168,7 @@ pub trait Store {
         &self,
         ctx: ReadingIndexViewContext,
     ) -> Result<ResponseDataIndex, WorkerError>;
-    async fn purge(&self, ctx: PurgeDataContext) -> Result<()>;
+    async fn purge(&self, ctx: PurgeDataContext) -> Result<i64>;
     async fn is_healthy(&self) -> Result<bool>;
 
     async fn require_buffer(

Reply via email to