This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c196ebbd [#1407] fix(rust): drop events and release memory when 
errors happened (#1509)
3c196ebbd is described below

commit 3c196ebbd7c41448ea458cf9bdf74850f0820c20
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Feb 6 15:54:56 2024 +0800

    [#1407] fix(rust): drop events and release memory when errors happened 
(#1509)
    
    ### What changes were proposed in this pull request?
    
    drop events and release memory when errors happened
    
    ### Why are the changes needed?
    
    Drop events to release memory to ensure service stable
    Sub tasks for #1407
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Online tests
---
 rust/experimental/server/src/error.rs        |  3 ++
 rust/experimental/server/src/metric.rs       | 12 +++++++
 rust/experimental/server/src/store/hybrid.rs | 48 +++++++++++++++++++---------
 3 files changed, 48 insertions(+), 15 deletions(-)

diff --git a/rust/experimental/server/src/error.rs 
b/rust/experimental/server/src/error.rs
index bb0c1a18e..7a7040882 100644
--- a/rust/experimental/server/src/error.rs
+++ b/rust/experimental/server/src/error.rs
@@ -60,6 +60,9 @@ pub enum WorkerError {
 
     #[error("Data should be read from hdfs in client side instead of from 
server side")]
     NOT_READ_HDFS_DATA_FROM_SERVER,
+
+    #[error("Spill event has been retried exceed the max limit for app: {0}")]
+    SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(String),
 }
 
 impl From<AcquireError> for WorkerError {
diff --git a/rust/experimental/server/src/metric.rs 
b/rust/experimental/server/src/metric.rs
index 9347f3857..03e53bc07 100644
--- a/rust/experimental/server/src/metric.rs
+++ b/rust/experimental/server/src/metric.rs
@@ -248,7 +248,19 @@ pub static GAUGE_IN_SPILL_DATA_SIZE: Lazy<IntGauge> =
 pub static GAUGE_GRPC_REQUEST_QUEUE_SIZE: Lazy<IntGauge> =
     Lazy::new(|| IntGauge::new("grpc_request_queue_size", "grpc request queue 
size").unwrap());
 
+pub static TOTAL_SPILL_EVENTS_DROPPED: Lazy<IntCounter> = Lazy::new(|| {
+    IntCounter::new(
+        "total_spill_events_dropped",
+        "total spill events dropped number",
+    )
+    .expect("")
+});
+
 fn register_custom_metrics() {
+    REGISTRY
+        .register(Box::new(TOTAL_SPILL_EVENTS_DROPPED.clone()))
+        .expect("");
+
     REGISTRY
         .register(Box::new(GAUGE_TOPN_APP_RESIDENT_DATA_SIZE.clone()))
         .expect("");
diff --git a/rust/experimental/server/src/store/hybrid.rs 
b/rust/experimental/server/src/store/hybrid.rs
index 74176072c..76bb3e83d 100644
--- a/rust/experimental/server/src/store/hybrid.rs
+++ b/rust/experimental/server/src/store/hybrid.rs
@@ -27,7 +27,7 @@ use crate::metric::{
     GAUGE_IN_SPILL_DATA_SIZE, GAUGE_MEMORY_SPILL_OPERATION, 
GAUGE_MEMORY_SPILL_TO_HDFS,
     GAUGE_MEMORY_SPILL_TO_LOCALFILE, TOTAL_MEMORY_SPILL_OPERATION,
     TOTAL_MEMORY_SPILL_OPERATION_FAILED, TOTAL_MEMORY_SPILL_TO_HDFS,
-    TOTAL_MEMORY_SPILL_TO_LOCALFILE,
+    TOTAL_MEMORY_SPILL_TO_LOCALFILE, TOTAL_SPILL_EVENTS_DROPPED,
 };
 use crate::readable_size::ReadableSize;
 #[cfg(feature = "hdfs")]
@@ -41,7 +41,7 @@ use crate::store::{
 use anyhow::{anyhow, Result};
 
 use async_trait::async_trait;
-use log::{debug, error, info};
+use log::{debug, error, info, warn};
 use prometheus::core::{Atomic, AtomicU64};
 use std::any::Any;
 
@@ -177,10 +177,13 @@ impl HybridStore {
         spill_message: SpillMessage,
     ) -> Result<String, WorkerError> {
         let mut ctx: WritingViewContext = spill_message.ctx;
-        let in_flight_blocks_id: i64 = spill_message.id;
         let retry_cnt = spill_message.retry_cnt;
 
-        let uid = ctx.uid.clone();
+        if retry_cnt > 3 {
+            let app_id = ctx.uid.app_id;
+            return 
Err(WorkerError::SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(app_id));
+        }
+
         let blocks = &ctx.data_blocks;
         let mut spill_size = 0i64;
         for block in blocks {
@@ -254,11 +257,6 @@ impl HybridStore {
             }
         }
 
-        self.hot_store
-            .release_in_flight_blocks_in_underlying_staging_buffer(uid, 
in_flight_blocks_id)
-            .await?;
-        self.hot_store.free_used(spill_size).await?;
-
         match candidate_store.name().await {
             StorageType::LOCALFILE => {
                 GAUGE_MEMORY_SPILL_TO_LOCALFILE.dec();
@@ -317,6 +315,17 @@ impl HybridStore {
 
         Ok(())
     }
+
+    async fn release_data_in_memory(&self, data_size: u64, message: 
&SpillMessage) -> Result<()> {
+        let uid = &message.ctx.uid;
+        let in_flight_id = message.id;
+        self.hot_store
+            
.release_in_flight_blocks_in_underlying_staging_buffer(uid.clone(), 
in_flight_id)
+            .await?;
+        self.hot_store.free_used(data_size as i64).await?;
+        self.hot_store.desc_to_in_flight_buffer_size(data_size);
+        Ok(())
+    }
 }
 
 #[async_trait]
@@ -360,7 +369,7 @@ impl Store for HybridStore {
 
                 TOTAL_MEMORY_SPILL_OPERATION.inc();
                 GAUGE_MEMORY_SPILL_OPERATION.inc();
-                let store_cloned = store.clone();
+                let store_ref = store.clone();
                 store
                     .runtime_manager
                     .write_runtime
@@ -371,14 +380,23 @@ impl Store for HybridStore {
                         }
 
                         GAUGE_IN_SPILL_DATA_SIZE.add(size as i64);
-                        match store_cloned
+                        match store_ref
                             .memory_spill_to_persistent_store(message.clone())
                             
.instrument_await("memory_spill_to_persistent_store.")
                             .await
                         {
                             Ok(msg) => {
-                                
store_cloned.hot_store.desc_to_in_flight_buffer_size(size);
-                                debug!("{}", msg)
+                                debug!("{}", msg);
+                                if let Err(err) = 
store_ref.release_data_in_memory(size, &message).await {
+                                    error!("Errors on releasing memory data, 
that should not happen. err: {:#?}", err);
+                                }
+                            }
+                            
Err(WorkerError::SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(_)) | 
Err(WorkerError::PARTIAL_DATA_LOST(_)) => {
+                                warn!("Dropping the spill event for app: {:?}. 
Attention: this will make data lost!", message.ctx.uid.app_id);
+                                if let Err(err) = 
store_ref.release_data_in_memory(size, &message).await {
+                                    error!("Errors on releasing memory data 
when dropping the spill event, that should not happen. err: {:#?}", err);
+                                }
+                                TOTAL_SPILL_EVENTS_DROPPED.inc();
                             }
                             Err(error) => {
                                 TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
@@ -389,10 +407,10 @@ impl Store for HybridStore {
 
                                 message.retry_cnt = message.retry_cnt + 1;
                                 // re-push to the queue to execute
-                                let _ = 
store_cloned.memory_spill_send.send(message).await;
+                                let _ = 
store_ref.memory_spill_send.send(message).await;
                             }
                         }
-                        store_cloned.memory_spill_event_num.dec_by(1);
+                        store_ref.memory_spill_event_num.dec_by(1);
                         GAUGE_IN_SPILL_DATA_SIZE.sub(size as i64);
                         GAUGE_MEMORY_SPILL_OPERATION.dec();
                         drop(concurrency_guarder);

Reply via email to