This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f6d1e7a57 [#1476] feat(rust): Provide dedicated unregister app rpc 
interface (#1511)
f6d1e7a57 is described below

commit f6d1e7a579060b6e537f14155687935cfcdb5183
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Feb 9 08:47:29 2024 +0800

    [#1476] feat(rust): Provide dedicated unregister app rpc interface (#1511)
    
    ### What changes were proposed in this pull request?
    
    Provide dedicated unregister app rpc interface for rust based server
    
    ### Why are the changes needed?
    
    Fix: #1476
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
---
 rust/experimental/server/src/app.rs              |  7 +++++-
 rust/experimental/server/src/grpc.rs             | 28 +++++++++++++++++++++---
 rust/experimental/server/src/proto/uniffle.proto | 10 +++++++++
 3 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/rust/experimental/server/src/app.rs 
b/rust/experimental/server/src/app.rs
index 7d5328cc9..57e79b552 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -726,12 +726,17 @@ impl AppManager {
         app_ref.register_shuffle(shuffle_id)
     }
 
-    pub async fn unregister(&self, app_id: String, shuffle_id: i32) -> 
Result<()> {
+    pub async fn unregister_shuffle(&self, app_id: String, shuffle_id: i32) -> 
Result<()> {
         self.sender
             .send(PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id))
             .await?;
         Ok(())
     }
+
+    pub async fn unregister_app(&self, app_id: String) -> Result<()> {
+        self.sender.send(PurgeEvent::APP_PURGE(app_id)).await?;
+        Ok(())
+    }
 }
 
 #[derive(Ord, PartialOrd, Eq, PartialEq, Default, Debug, Hash, Clone)]
diff --git a/rust/experimental/server/src/grpc.rs 
b/rust/experimental/server/src/grpc.rs
index ad30e003f..7557512d7 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -29,8 +29,8 @@ use crate::proto::uniffle::{
     GetShuffleResultRequest, GetShuffleResultResponse, 
ReportShuffleResultRequest,
     ReportShuffleResultResponse, RequireBufferRequest, RequireBufferResponse,
     SendShuffleDataRequest, SendShuffleDataResponse, ShuffleCommitRequest, 
ShuffleCommitResponse,
-    ShuffleRegisterRequest, ShuffleRegisterResponse, ShuffleUnregisterRequest,
-    ShuffleUnregisterResponse,
+    ShuffleRegisterRequest, ShuffleRegisterResponse, 
ShuffleUnregisterByAppIdRequest,
+    ShuffleUnregisterByAppIdResponse, ShuffleUnregisterRequest, 
ShuffleUnregisterResponse,
 };
 use crate::store::{PartitionedData, ResponseDataIndex};
 use await_tree::InstrumentAwait;
@@ -125,7 +125,7 @@ impl ShuffleServer for DefaultShuffleServer {
         );
         let status_code = self
             .app_manager_ref
-            .unregister(app_id, shuffle_id)
+            .unregister_shuffle(app_id, shuffle_id)
             .await
             .map_or_else(|_e| StatusCode::INTERNAL_ERROR, |_| 
StatusCode::SUCCESS);
 
@@ -135,6 +135,28 @@ impl ShuffleServer for DefaultShuffleServer {
         }))
     }
 
+    // Once unregister app accepted, the data could be purged.
+    async fn unregister_shuffle_by_app_id(
+        &self,
+        request: Request<ShuffleUnregisterByAppIdRequest>,
+    ) -> Result<Response<ShuffleUnregisterByAppIdResponse>, Status> {
+        let request = request.into_inner();
+        let app_id = request.app_id;
+
+        info!("Accepted unregister app rpc. app_id: {:?}", &app_id);
+
+        let code = self
+            .app_manager_ref
+            .unregister_app(app_id)
+            .await
+            .map_or_else(|_e| StatusCode::INTERNAL_ERROR, |_| 
StatusCode::SUCCESS);
+
+        Ok(Response::new(ShuffleUnregisterByAppIdResponse {
+            status: code.into(),
+            ret_msg: "".to_string(),
+        }))
+    }
+
     async fn send_shuffle_data(
         &self,
         request: Request<SendShuffleDataRequest>,
diff --git a/rust/experimental/server/src/proto/uniffle.proto 
b/rust/experimental/server/src/proto/uniffle.proto
index 6604c782c..b394c3665 100644
--- a/rust/experimental/server/src/proto/uniffle.proto
+++ b/rust/experimental/server/src/proto/uniffle.proto
@@ -23,6 +23,7 @@ package rss.common;
 service ShuffleServer {
   rpc registerShuffle (ShuffleRegisterRequest) returns 
(ShuffleRegisterResponse);
   rpc unregisterShuffle(ShuffleUnregisterRequest) returns 
(ShuffleUnregisterResponse);
+  rpc unregisterShuffleByAppId(ShuffleUnregisterByAppIdRequest) returns 
(ShuffleUnregisterByAppIdResponse);
   rpc sendShuffleData (SendShuffleDataRequest) returns 
(SendShuffleDataResponse);
   rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns 
(GetLocalShuffleIndexResponse);
   rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns 
(GetLocalShuffleDataResponse);
@@ -194,6 +195,15 @@ message ShuffleRegisterResponse {
   string retMsg = 2;
 }
 
+message ShuffleUnregisterByAppIdRequest {
+  string appId = 1;
+}
+
+message ShuffleUnregisterByAppIdResponse {
+  StatusCode status = 1;
+  string retMsg = 2;
+}
+
 message SendShuffleDataRequest {
   string appId = 1;
   int32 shuffleId = 2;

Reply via email to