This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7d22e5f0c [#1407] feat(rust): support more metrics about disk and topN
data size (#1488)
7d22e5f0c is described below
commit 7d22e5f0cedd17f2b14f8d757ec25c5bc287fb39
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jan 29 13:41:58 2024 +0800
[#1407] feat(rust): support more metrics about disk and topN data size
(#1488)
### What changes were proposed in this pull request?
1. feat(rust): add more metrics about local disk and reading
2. feat(rust): support topN app data size
### Why are the changes needed?
Fix: #1407
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests
---
rust/experimental/server/src/app.rs | 94 ++++++++++++++++++++++--
rust/experimental/server/src/grpc.rs | 5 ++
rust/experimental/server/src/metric.rs | 83 +++++++++++++++++++++
rust/experimental/server/src/store/hdfs.rs | 9 ++-
rust/experimental/server/src/store/hybrid.rs | 20 +++--
rust/experimental/server/src/store/local/disk.rs | 59 ++++++++++++---
rust/experimental/server/src/store/localfile.rs | 11 ++-
rust/experimental/server/src/store/memory.rs | 4 +-
rust/experimental/server/src/store/mod.rs | 2 +-
9 files changed, 252 insertions(+), 35 deletions(-)
diff --git a/rust/experimental/server/src/app.rs
b/rust/experimental/server/src/app.rs
index 35699f780..7d5328cc9 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -18,8 +18,9 @@
use crate::config::Config;
use crate::error::WorkerError;
use crate::metric::{
- GAUGE_APP_NUMBER, TOTAL_APP_NUMBER,
TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED,
- TOTAL_READ_DATA, TOTAL_RECEIVED_DATA, TOTAL_REQUIRE_BUFFER_FAILED,
+ GAUGE_APP_NUMBER, GAUGE_TOPN_APP_RESIDENT_DATA_SIZE, TOTAL_APP_NUMBER,
+ TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED, TOTAL_READ_DATA,
TOTAL_READ_DATA_FROM_LOCALFILE,
+ TOTAL_READ_DATA_FROM_MEMORY, TOTAL_RECEIVED_DATA,
TOTAL_REQUIRE_BUFFER_FAILED,
};
use crate::readable_size::ReadableSize;
@@ -47,6 +48,7 @@ use std::hash::{Hash, Hasher};
use std::str::FromStr;
use crate::proto::uniffle::RemoteStorage;
+use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use std::sync::{Arc, OnceLock};
@@ -130,6 +132,9 @@ pub struct App {
bitmap_of_blocks: DashMap<(i32, i32), PartitionedMeta>,
huge_partition_marked_threshold: Option<u64>,
huge_partition_memory_max_available_size: Option<u64>,
+
+ total_received_data_size: AtomicU64,
+ total_resident_data_size: AtomicU64,
}
#[derive(Clone)]
@@ -220,6 +225,8 @@ impl App {
bitmap_of_blocks: DashMap::new(),
huge_partition_marked_threshold,
huge_partition_memory_max_available_size,
+ total_received_data_size: Default::default(),
+ total_resident_data_size: Default::default(),
}
}
@@ -245,6 +252,8 @@ impl App {
self.get_underlying_partition_bitmap(ctx.uid.clone())
.incr_data_size(len)?;
TOTAL_RECEIVED_DATA.inc_by(len as u64);
+ self.total_received_data_size.fetch_add(len as u64, SeqCst);
+ self.total_resident_data_size.fetch_add(len as u64, SeqCst);
let ctx = match self.is_huge_partition(&ctx.uid).await {
Ok(true) => WritingViewContext::new(ctx.uid.clone(),
ctx.data_blocks, true),
@@ -258,11 +267,18 @@ impl App {
pub async fn select(&self, ctx: ReadingViewContext) ->
Result<ResponseData, WorkerError> {
let response = self.store.get(ctx).await;
response.map(|data| {
- let length = match &data {
- ResponseData::Local(local_data) => local_data.data.len() as
u64,
- ResponseData::Mem(mem_data) => mem_data.data.len() as u64,
+ match &data {
+ ResponseData::Local(local_data) => {
+ let length = local_data.data.len() as u64;
+ TOTAL_READ_DATA_FROM_LOCALFILE.inc_by(length);
+ TOTAL_READ_DATA.inc_by(length);
+ }
+ ResponseData::Mem(mem_data) => {
+ let length = mem_data.data.len() as u64;
+ TOTAL_READ_DATA_FROM_MEMORY.inc_by(length);
+ TOTAL_READ_DATA.inc_by(length);
+ }
};
- TOTAL_READ_DATA.inc_by(length);
data
})
@@ -368,9 +384,21 @@ impl App {
}
pub async fn purge(&self, app_id: String, shuffle_id: Option<i32>) ->
Result<()> {
- self.store
+ let removed_size = self
+ .store
.purge(PurgeDataContext::new(app_id, shuffle_id))
- .await
+ .await?;
+ self.total_resident_data_size
+ .fetch_sub(removed_size as u64, SeqCst);
+ Ok(())
+ }
+
+ pub fn total_received_data_size(&self) -> u64 {
+ self.total_received_data_size.load(SeqCst)
+ }
+
+ pub fn total_resident_data_size(&self) -> u64 {
+ self.total_resident_data_size.load(SeqCst)
}
}
@@ -563,6 +591,31 @@ impl AppManager {
}
});
+ // calculate topN app shuffle data size
+ let app_manager_ref = app_ref.clone();
+ runtime_manager.default_runtime.spawn(async move {
+ info!("Starting calculating topN app shuffle data size...");
+ loop {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+
+ let view = app_manager_ref.apps.clone().into_read_only();
+ let mut apps: Vec<_> = view.values().collect();
+ apps.sort_by_key(|x| 0 - x.total_resident_data_size());
+
+ let top_n = 10;
+ let limit = if apps.len() > top_n {
+ top_n
+ } else {
+ apps.len()
+ };
+ for idx in 0..limit {
+ GAUGE_TOPN_APP_RESIDENT_DATA_SIZE
+ .with_label_values(&[&apps[idx].app_id])
+ .set(apps[idx].total_resident_data_size() as i64);
+ }
+ }
+ });
+
let app_manager_cloned = app_ref.clone();
runtime_manager.default_runtime.spawn(async move {
info!("Starting purge event handler...");
@@ -719,6 +772,7 @@ mod test {
use crate::store::{PartitionedDataBlock, ResponseData};
use croaring::treemap::JvmSerializer;
use croaring::Treemap;
+ use dashmap::DashMap;
#[test]
fn test_uid_hash() {
@@ -802,12 +856,20 @@ mod test {
_ => todo!(),
}
+ // check the data size
+ assert_eq!(30, app.total_received_data_size());
+ assert_eq!(30, app.total_resident_data_size());
+
// case3: purge
runtime_manager
.wait(app_manager_ref.purge_app_data(app_id.to_string(), None))
.expect("");
assert_eq!(false, app_manager_ref.get_app(app_id).is_none());
+
+ // check the data size again after the data has been removed
+ assert_eq!(30, app.total_received_data_size());
+ assert_eq!(0, app.total_resident_data_size());
}
}
@@ -857,4 +919,20 @@ mod test {
let deserialized = Treemap::deserialize(&data).unwrap();
assert_eq!(deserialized, Treemap::from_iter(vec![123, 124]));
}
+
+ #[test]
+ fn test_dashmap_values() {
+ let dashmap = DashMap::new();
+ dashmap.insert(1, 3);
+ dashmap.insert(2, 2);
+ dashmap.insert(3, 8);
+
+ let cloned = dashmap.clone().into_read_only();
+ let mut vals: Vec<_> = cloned.values().collect();
+ vals.sort_by_key(|x| -(*x));
+ assert_eq!(vec![&8, &3, &2], vals);
+
+ let apps = vec![0, 1, 2, 3];
+ println!("{:#?}", &apps[0..2]);
+ }
}
diff --git a/rust/experimental/server/src/grpc.rs
b/rust/experimental/server/src/grpc.rs
index e04447335..ad30e003f 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -698,6 +698,7 @@ impl ShuffleServer for DefaultShuffleServer {
}
pub mod metrics_middleware {
+ use crate::metric::GAUGE_GRPC_REQUEST_QUEUE_SIZE;
use hyper::service::Service;
use hyper::Body;
use prometheus::HistogramVec;
@@ -746,6 +747,8 @@ pub mod metrics_middleware {
}
fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
+ GAUGE_GRPC_REQUEST_QUEUE_SIZE.inc();
+
// This is necessary because tonic internally uses
`tower::buffer::Buffer`.
// See
https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary
@@ -762,6 +765,8 @@ pub mod metrics_middleware {
timer.observe_duration();
+ GAUGE_GRPC_REQUEST_QUEUE_SIZE.inc();
+
Ok(response)
})
}
diff --git a/rust/experimental/server/src/metric.rs
b/rust/experimental/server/src/metric.rs
index 7c1fe2792..9347f3857 100644
--- a/rust/experimental/server/src/metric.rs
+++ b/rust/experimental/server/src/metric.rs
@@ -39,6 +39,19 @@ pub static TOTAL_READ_DATA: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("total_read_data", "Reading Data").expect("metric should
be created")
});
+pub static TOTAL_READ_DATA_FROM_MEMORY: Lazy<IntCounter> = Lazy::new(|| {
+ IntCounter::new("total_read_data_from_memory", "Reading Data from memory")
+ .expect("metric should be created")
+});
+
+pub static TOTAL_READ_DATA_FROM_LOCALFILE: Lazy<IntCounter> = Lazy::new(|| {
+ IntCounter::new(
+ "total_read_data_from_localfile",
+ "Reading Data from localfile",
+ )
+ .expect("metric should be created")
+});
+
pub static GRPC_GET_MEMORY_DATA_TRANSPORT_TIME: Lazy<Histogram> = Lazy::new(||
{
let opts = HistogramOpts::new("grpc_get_memory_data_transport_time",
"none")
.buckets(Vec::from(DEFAULT_BUCKETS as &'static [f64]));
@@ -175,6 +188,33 @@ pub static TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED:
Lazy<IntCounter> = Lazy::
.expect("metrics should be created")
});
+pub static GAUGE_LOCAL_DISK_CAPACITY: Lazy<IntGaugeVec> = Lazy::new(|| {
+ register_int_gauge_vec!(
+ "local_disk_capacity",
+ "local disk capacity for root path",
+ &["root"]
+ )
+ .unwrap()
+});
+
+pub static GAUGE_LOCAL_DISK_USED: Lazy<IntGaugeVec> = Lazy::new(|| {
+ register_int_gauge_vec!(
+ "local_disk_used",
+ "local disk used for root path",
+ &["root"]
+ )
+ .unwrap()
+});
+
+pub static GAUGE_LOCAL_DISK_IS_HEALTHY: Lazy<IntGaugeVec> = Lazy::new(|| {
+ register_int_gauge_vec!(
+ "local_disk_is_healthy",
+ "local disk is_healthy for root path",
+ &["root"]
+ )
+ .unwrap()
+});
+
pub static GAUGE_RUNTIME_ALIVE_THREAD_NUM: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"runtime_thread_alive_gauge",
@@ -193,7 +233,50 @@ pub static GAUGE_RUNTIME_IDLE_THREAD_NUM:
Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});
+pub static GAUGE_TOPN_APP_RESIDENT_DATA_SIZE: Lazy<IntGaugeVec> = Lazy::new(||
{
+ register_int_gauge_vec!(
+ "topN_app_resident_data_size",
+ "topN app resident data size",
+ &["app_id"]
+ )
+ .unwrap()
+});
+
+pub static GAUGE_IN_SPILL_DATA_SIZE: Lazy<IntGauge> =
+ Lazy::new(|| IntGauge::new("in_spill_data_size", "total data size in
spill").unwrap());
+
+pub static GAUGE_GRPC_REQUEST_QUEUE_SIZE: Lazy<IntGauge> =
+ Lazy::new(|| IntGauge::new("grpc_request_queue_size", "grpc request queue
size").unwrap());
+
fn register_custom_metrics() {
+ REGISTRY
+ .register(Box::new(GAUGE_TOPN_APP_RESIDENT_DATA_SIZE.clone()))
+ .expect("");
+
+ REGISTRY
+ .register(Box::new(TOTAL_READ_DATA_FROM_LOCALFILE.clone()))
+ .expect("total_read_data must be registered");
+
+ REGISTRY
+ .register(Box::new(TOTAL_READ_DATA_FROM_MEMORY.clone()))
+ .expect("total_read_data must be registered");
+
+ REGISTRY
+ .register(Box::new(GAUGE_IN_SPILL_DATA_SIZE.clone()))
+ .expect("");
+
+ REGISTRY
+ .register(Box::new(GAUGE_LOCAL_DISK_CAPACITY.clone()))
+ .expect("");
+
+ REGISTRY
+ .register(Box::new(GAUGE_LOCAL_DISK_USED.clone()))
+ .expect("");
+
+ REGISTRY
+ .register(Box::new(GAUGE_LOCAL_DISK_IS_HEALTHY.clone()))
+ .expect("");
+
REGISTRY
.register(Box::new(GAUGE_RUNTIME_ALIVE_THREAD_NUM.clone()))
.expect("");
diff --git a/rust/experimental/server/src/store/hdfs.rs
b/rust/experimental/server/src/store/hdfs.rs
index 3a1b74c80..ce503d0e7 100644
--- a/rust/experimental/server/src/store/hdfs.rs
+++ b/rust/experimental/server/src/store/hdfs.rs
@@ -242,7 +242,7 @@ impl Store for HdfsStore {
todo!()
}
- async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+ async fn purge(&self, ctx: PurgeDataContext) -> Result<i64> {
let app_id = ctx.app_id;
let filesystem = self.app_remote_clients.get(&app_id).ok_or(
WorkerError::HDFS_NATIVE_CLIENT_NOT_FOUND(app_id.to_string()),
@@ -260,9 +260,12 @@ impl Store for HdfsStore {
.map(|entry| entry.key().to_string())
.collect();
+ let mut removed_size = 0i64;
for deleted_key in keys_to_delete {
self.partition_file_locks.remove(&deleted_key);
- self.partition_cached_meta.remove(&deleted_key);
+ if let Some(meta) =
self.partition_cached_meta.remove(&deleted_key) {
+ removed_size += meta.1.data_len;
+ }
}
info!("The hdfs data for {} has been deleted", &dir);
@@ -273,7 +276,7 @@ impl Store for HdfsStore {
self.app_remote_clients.remove(&app_id);
}
- Ok(())
+ Ok(removed_size)
}
async fn is_healthy(&self) -> Result<bool> {
diff --git a/rust/experimental/server/src/store/hybrid.rs
b/rust/experimental/server/src/store/hybrid.rs
index cb29357fe..ce996841a 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -24,8 +24,9 @@ use crate::await_tree::AWAIT_TREE_REGISTRY;
use crate::config::{Config, HybridStoreConfig, StorageType};
use crate::error::WorkerError;
use crate::metric::{
- GAUGE_MEMORY_SPILL_OPERATION, GAUGE_MEMORY_SPILL_TO_HDFS,
GAUGE_MEMORY_SPILL_TO_LOCALFILE,
- TOTAL_MEMORY_SPILL_OPERATION, TOTAL_MEMORY_SPILL_OPERATION_FAILED,
TOTAL_MEMORY_SPILL_TO_HDFS,
+ GAUGE_IN_SPILL_DATA_SIZE, GAUGE_MEMORY_SPILL_OPERATION,
GAUGE_MEMORY_SPILL_TO_HDFS,
+ GAUGE_MEMORY_SPILL_TO_LOCALFILE, TOTAL_MEMORY_SPILL_OPERATION,
+ TOTAL_MEMORY_SPILL_OPERATION_FAILED, TOTAL_MEMORY_SPILL_TO_HDFS,
TOTAL_MEMORY_SPILL_TO_LOCALFILE,
};
use crate::readable_size::ReadableSize;
@@ -379,6 +380,8 @@ impl Store for HybridStore {
for block in &message.ctx.data_blocks {
size += block.length as u64;
}
+
+ GAUGE_IN_SPILL_DATA_SIZE.add(size as i64);
match store_cloned
.memory_spill_to_persistent_store(message.clone())
.instrument_await("memory_spill_to_persistent_store.")
@@ -401,6 +404,7 @@ impl Store for HybridStore {
}
}
store_cloned.memory_spill_event_num.dec_by(1);
+ GAUGE_IN_SPILL_DATA_SIZE.sub(size as i64);
GAUGE_MEMORY_SPILL_OPERATION.dec();
drop(concurrency_guarder);
}));
@@ -483,19 +487,21 @@ impl Store for HybridStore {
self.hot_store.release_buffer(ctx).await
}
- async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+ async fn purge(&self, ctx: PurgeDataContext) -> Result<i64> {
let app_id = &ctx.app_id;
- self.hot_store.purge(ctx.clone()).await?;
+ let mut removed_size = 0i64;
+
+ removed_size += self.hot_store.purge(ctx.clone()).await?;
info!("Removed data of app:[{}] in hot store", app_id);
if self.warm_store.is_some() {
- self.warm_store.as_ref().unwrap().purge(ctx.clone()).await?;
+ removed_size +=
self.warm_store.as_ref().unwrap().purge(ctx.clone()).await?;
info!("Removed data of app:[{}] in warm store", app_id);
}
if self.cold_store.is_some() {
- self.cold_store.as_ref().unwrap().purge(ctx.clone()).await?;
+ removed_size +=
self.cold_store.as_ref().unwrap().purge(ctx.clone()).await?;
info!("Removed data of app:[{}] in cold store", app_id);
}
- Ok(())
+ Ok(removed_size)
}
async fn is_healthy(&self) -> Result<bool> {
diff --git a/rust/experimental/server/src/store/local/disk.rs
b/rust/experimental/server/src/store/local/disk.rs
index b8cd2c14a..7302903db 100644
--- a/rust/experimental/server/src/store/local/disk.rs
+++ b/rust/experimental/server/src/store/local/disk.rs
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use crate::metric::{
+ GAUGE_LOCAL_DISK_CAPACITY, GAUGE_LOCAL_DISK_IS_HEALTHY,
GAUGE_LOCAL_DISK_USED,
+};
use crate::runtime::manager::RuntimeManager;
use anyhow::{anyhow, Result};
use await_tree::InstrumentAwait;
@@ -63,6 +66,8 @@ pub struct LocalDisk {
is_corrupted: AtomicBool,
is_healthy: AtomicBool,
config: LocalDiskConfig,
+
+ capacity: u64,
}
impl LocalDisk {
@@ -75,13 +80,17 @@ impl LocalDisk {
builder.root(&root);
let operator: Operator = Operator::new(builder).unwrap().finish();
+ let disk_capacity =
+ Self::get_disk_capacity(&root).expect("Errors on getting disk
capacity");
+
let instance = LocalDisk {
- root,
+ root: root.to_string(),
operator,
concurrency_limiter: Semaphore::new(config.max_concurrency as
usize),
is_corrupted: AtomicBool::new(false),
is_healthy: AtomicBool::new(true),
config,
+ capacity: disk_capacity,
};
let instance = Arc::new(instance);
@@ -92,6 +101,10 @@ impl LocalDisk {
LocalDisk::loop_check_disk(cloned).await;
});
+ GAUGE_LOCAL_DISK_CAPACITY
+ .with_label_values(&[&root])
+ .set(disk_capacity as i64);
+
instance
}
@@ -124,31 +137,45 @@ impl LocalDisk {
return;
}
+ let root_ref = &local_disk.root;
+
let check_succeed: Result<()> =
LocalDisk::write_read_check(local_disk.clone()).await;
if check_succeed.is_err() {
local_disk.mark_corrupted();
+ GAUGE_LOCAL_DISK_IS_HEALTHY
+ .with_label_values(&[root_ref])
+ .set(1i64);
error!(
"Errors on checking local disk corruption. err: {:#?}",
check_succeed.err()
);
}
- // check the capacity
- let used_ratio = local_disk.get_disk_used_ratio();
- if used_ratio.is_err() {
+ // get the disk used ratio.
+ let disk_capacity = local_disk.capacity;
+ let disk_available = Self::get_disk_available(root_ref);
+ if disk_available.is_err() {
error!(
- "Errors on getting the used ratio of the disk capacity.
err: {:?}",
- used_ratio.err()
+ "Errors on getting the available of the local disk. err:
{:?}",
+ disk_available.err()
);
continue;
}
+ let disk_available = disk_available.unwrap();
+ let used_ratio = 1.0 - (disk_available as f64 / disk_capacity as
f64);
+
+ GAUGE_LOCAL_DISK_USED
+ .with_label_values(&[root_ref])
+ .set((disk_capacity - disk_available) as i64);
- let used_ratio = used_ratio.unwrap();
if local_disk.is_healthy().unwrap()
&& used_ratio > local_disk.config.high_watermark as f64
{
warn!("Disk={} has been unhealthy.", &local_disk.root);
local_disk.mark_unhealthy();
+ GAUGE_LOCAL_DISK_IS_HEALTHY
+ .with_label_values(&[root_ref])
+ .set(1i64);
continue;
}
@@ -157,6 +184,9 @@ impl LocalDisk {
{
warn!("Disk={} has been healthy.", &local_disk.root);
local_disk.mark_healthy();
+ GAUGE_LOCAL_DISK_IS_HEALTHY
+ .with_label_values(&[root_ref])
+ .set(0i64);
continue;
}
}
@@ -248,11 +278,18 @@ impl LocalDisk {
Ok(self.is_healthy.load(Ordering::SeqCst))
}
- fn get_disk_used_ratio(&self) -> Result<f64> {
+ fn get_disk_used_ratio(root: &str, capacity: u64) -> Result<f64> {
// Get the total and available space in bytes
- let available_space = fs2::available_space(&self.root)?;
- let total_space = fs2::total_space(&self.root)?;
- Ok(1.0 - (available_space as f64 / total_space as f64))
+ let available_space = fs2::available_space(root)?;
+ Ok(1.0 - (available_space as f64 / capacity as f64))
+ }
+
+ fn get_disk_capacity(root: &str) -> Result<u64> {
+ Ok(fs2::total_space(root)?)
+ }
+
+ fn get_disk_available(root: &str) -> Result<u64> {
+ Ok(fs2::available_space(root)?)
}
}
diff --git a/rust/experimental/server/src/store/localfile.rs
b/rust/experimental/server/src/store/localfile.rs
index bb810dc78..782311fcf 100644
--- a/rust/experimental/server/src/store/localfile.rs
+++ b/rust/experimental/server/src/store/localfile.rs
@@ -364,7 +364,7 @@ impl Store for LocalFileStore {
}))
}
- async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+ async fn purge(&self, ctx: PurgeDataContext) -> Result<i64> {
let app_id = ctx.app_id;
let shuffle_id_option = ctx.shuffle_id;
@@ -385,11 +385,16 @@ impl Store for LocalFileStore {
.map(|entry| entry.key().to_string())
.collect();
+ let mut removed_data_size = 0i64;
for key in keys_to_delete {
- self.partition_locks.remove(&key);
+ let meta = self.partition_locks.remove(&key);
+ if let Some(x) = meta {
+ let size = x.1.pointer.load(Ordering::SeqCst);
+ removed_data_size += size;
+ }
}
- Ok(())
+ Ok(removed_data_size)
}
async fn is_healthy(&self) -> Result<bool> {
diff --git a/rust/experimental/server/src/store/memory.rs
b/rust/experimental/server/src/store/memory.rs
index b1be8e5ae..a77ae0397 100644
--- a/rust/experimental/server/src/store/memory.rs
+++ b/rust/experimental/server/src/store/memory.rs
@@ -387,7 +387,7 @@ impl Store for MemoryStore {
panic!("It should not be invoked.")
}
- async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
+ async fn purge(&self, ctx: PurgeDataContext) -> Result<i64> {
let app_id = ctx.app_id;
let shuffle_id_option = ctx.shuffle_id;
@@ -424,7 +424,7 @@ impl Store for MemoryStore {
used, app_id, shuffle_id_option
);
- Ok(())
+ Ok(used)
}
async fn is_healthy(&self) -> Result<bool> {
diff --git a/rust/experimental/server/src/store/mod.rs
b/rust/experimental/server/src/store/mod.rs
index 12a509d9b..29d15cd18 100644
--- a/rust/experimental/server/src/store/mod.rs
+++ b/rust/experimental/server/src/store/mod.rs
@@ -168,7 +168,7 @@ pub trait Store {
&self,
ctx: ReadingIndexViewContext,
) -> Result<ResponseDataIndex, WorkerError>;
- async fn purge(&self, ctx: PurgeDataContext) -> Result<()>;
+ async fn purge(&self, ctx: PurgeDataContext) -> Result<i64>;
async fn is_healthy(&self) -> Result<bool>;
async fn require_buffer(