This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f6d1e7a57 [#1476] feat(rust): Provide dedicated unregister app rpc
interface (#1511)
f6d1e7a57 is described below
commit f6d1e7a579060b6e537f14155687935cfcdb5183
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Feb 9 08:47:29 2024 +0800
[#1476] feat(rust): Provide dedicated unregister app rpc interface (#1511)
### What changes were proposed in this pull request?
Provide dedicated unregister app rpc interface for rust based server
### Why are the changes needed?
Fix: #1476
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
---
rust/experimental/server/src/app.rs | 7 +++++-
rust/experimental/server/src/grpc.rs | 28 +++++++++++++++++++++---
rust/experimental/server/src/proto/uniffle.proto | 10 +++++++++
3 files changed, 41 insertions(+), 4 deletions(-)
diff --git a/rust/experimental/server/src/app.rs
b/rust/experimental/server/src/app.rs
index 7d5328cc9..57e79b552 100644
--- a/rust/experimental/server/src/app.rs
+++ b/rust/experimental/server/src/app.rs
@@ -726,12 +726,17 @@ impl AppManager {
app_ref.register_shuffle(shuffle_id)
}
- pub async fn unregister(&self, app_id: String, shuffle_id: i32) ->
Result<()> {
+ pub async fn unregister_shuffle(&self, app_id: String, shuffle_id: i32) ->
Result<()> {
self.sender
.send(PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id))
.await?;
Ok(())
}
+
+ pub async fn unregister_app(&self, app_id: String) -> Result<()> {
+ self.sender.send(PurgeEvent::APP_PURGE(app_id)).await?;
+ Ok(())
+ }
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Default, Debug, Hash, Clone)]
diff --git a/rust/experimental/server/src/grpc.rs
b/rust/experimental/server/src/grpc.rs
index ad30e003f..7557512d7 100644
--- a/rust/experimental/server/src/grpc.rs
+++ b/rust/experimental/server/src/grpc.rs
@@ -29,8 +29,8 @@ use crate::proto::uniffle::{
GetShuffleResultRequest, GetShuffleResultResponse,
ReportShuffleResultRequest,
ReportShuffleResultResponse, RequireBufferRequest, RequireBufferResponse,
SendShuffleDataRequest, SendShuffleDataResponse, ShuffleCommitRequest,
ShuffleCommitResponse,
- ShuffleRegisterRequest, ShuffleRegisterResponse, ShuffleUnregisterRequest,
- ShuffleUnregisterResponse,
+ ShuffleRegisterRequest, ShuffleRegisterResponse,
ShuffleUnregisterByAppIdRequest,
+ ShuffleUnregisterByAppIdResponse, ShuffleUnregisterRequest,
ShuffleUnregisterResponse,
};
use crate::store::{PartitionedData, ResponseDataIndex};
use await_tree::InstrumentAwait;
@@ -125,7 +125,7 @@ impl ShuffleServer for DefaultShuffleServer {
);
let status_code = self
.app_manager_ref
- .unregister(app_id, shuffle_id)
+ .unregister_shuffle(app_id, shuffle_id)
.await
.map_or_else(|_e| StatusCode::INTERNAL_ERROR, |_|
StatusCode::SUCCESS);
@@ -135,6 +135,28 @@ impl ShuffleServer for DefaultShuffleServer {
}))
}
+ // Once unregister app accepted, the data could be purged.
+ async fn unregister_shuffle_by_app_id(
+ &self,
+ request: Request<ShuffleUnregisterByAppIdRequest>,
+ ) -> Result<Response<ShuffleUnregisterByAppIdResponse>, Status> {
+ let request = request.into_inner();
+ let app_id = request.app_id;
+
+ info!("Accepted unregister app rpc. app_id: {:?}", &app_id);
+
+ let code = self
+ .app_manager_ref
+ .unregister_app(app_id)
+ .await
+ .map_or_else(|_e| StatusCode::INTERNAL_ERROR, |_|
StatusCode::SUCCESS);
+
+ Ok(Response::new(ShuffleUnregisterByAppIdResponse {
+ status: code.into(),
+ ret_msg: "".to_string(),
+ }))
+ }
+
async fn send_shuffle_data(
&self,
request: Request<SendShuffleDataRequest>,
diff --git a/rust/experimental/server/src/proto/uniffle.proto
b/rust/experimental/server/src/proto/uniffle.proto
index 6604c782c..b394c3665 100644
--- a/rust/experimental/server/src/proto/uniffle.proto
+++ b/rust/experimental/server/src/proto/uniffle.proto
@@ -23,6 +23,7 @@ package rss.common;
service ShuffleServer {
rpc registerShuffle (ShuffleRegisterRequest) returns
(ShuffleRegisterResponse);
rpc unregisterShuffle(ShuffleUnregisterRequest) returns
(ShuffleUnregisterResponse);
+ rpc unregisterShuffleByAppId(ShuffleUnregisterByAppIdRequest) returns
(ShuffleUnregisterByAppIdResponse);
rpc sendShuffleData (SendShuffleDataRequest) returns
(SendShuffleDataResponse);
rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns
(GetLocalShuffleIndexResponse);
rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns
(GetLocalShuffleDataResponse);
@@ -194,6 +195,15 @@ message ShuffleRegisterResponse {
string retMsg = 2;
}
+message ShuffleUnregisterByAppIdRequest {
+ string appId = 1;
+}
+
+message ShuffleUnregisterByAppIdResponse {
+ StatusCode status = 1;
+ string retMsg = 2;
+}
+
message SendShuffleDataRequest {
string appId = 1;
int32 shuffleId = 2;