This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9471cba43 [#1476] feat(spark): Provide dedicated unregister app rpc 
interface (#1510)
9471cba43 is described below

commit 9471cba436d0cec604a780faa01da5d1341fef5a
Author: Yiqi You <[email protected]>
AuthorDate: Thu Feb 8 09:26:28 2024 +0800

    [#1476] feat(spark): Provide dedicated unregister app rpc interface (#1510)
    
    ### What changes were proposed in this pull request?
    Introduce dedicated unregisterApp rpc interface which is only called once 
when unregister shuffle
    
    ### Why are the changes needed?
    Fix: [#1476](https://github.com/apache/incubator-uniffle/issues/1476)
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
---
 .../client/impl/ShuffleWriteClientImpl.java        | 40 +++++++++++++++++++++-
 .../uniffle/client/api/ShuffleServerClient.java    |  5 +++
 .../client/impl/grpc/ShuffleServerGrpcClient.java  | 33 ++++++++++++++++++
 .../RssUnregisterShuffleByAppIdRequest.java        | 30 ++++++++++++++++
 .../RssUnregisterShuffleByAppIdResponse.java       | 27 +++++++++++++++
 proto/src/main/proto/Rss.proto                     | 10 ++++++
 .../uniffle/server/ShuffleServerGrpcService.java   | 24 +++++++++++++
 .../apache/uniffle/server/ShuffleTaskManager.java  | 11 ++++++
 .../server/event/AppUnregisterPurgeEvent.java      | 24 +++++++++++++
 9 files changed, 203 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index faa1f247c..337869d22 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -60,6 +60,7 @@ import 
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
 import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
 import org.apache.uniffle.client.response.ClientResponse;
 import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
@@ -73,6 +74,7 @@ import 
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssSendCommitResponse;
 import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
 import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
 import org.apache.uniffle.client.util.ClientUtils;
@@ -984,6 +986,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
 
   @Override
   public void unregisterShuffle(String appId) {
+    RssUnregisterShuffleByAppIdRequest request = new 
RssUnregisterShuffleByAppIdRequest(appId);
+
     if (appId == null) {
       return;
     }
@@ -991,7 +995,41 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     if (appServerMap == null) {
       return;
     }
-    appServerMap.keySet().forEach(shuffleId -> unregisterShuffle(appId, 
shuffleId));
+    Set<ShuffleServerInfo> shuffleServerInfos = getAllShuffleServers(appId);
+
+    ExecutorService executorService = null;
+    try {
+      executorService =
+          ThreadUtils.getDaemonFixedThreadPool(
+              Math.min(unregisterThreadPoolSize, shuffleServerInfos.size()), 
"unregister-shuffle");
+
+      ThreadUtils.executeTasks(
+          executorService,
+          shuffleServerInfos,
+          shuffleServerInfo -> {
+            try {
+              ShuffleServerClient client =
+                  ShuffleServerClientFactory.getInstance()
+                      .getShuffleServerClient(clientType, shuffleServerInfo, 
rssConf);
+              RssUnregisterShuffleByAppIdResponse response =
+                  client.unregisterShuffleByAppId(request);
+              if (response.getStatusCode() != StatusCode.SUCCESS) {
+                LOG.warn("Failed to unregister shuffle to " + 
shuffleServerInfo);
+              }
+            } catch (Exception e) {
+              LOG.warn("Error happened when unregistering to " + 
shuffleServerInfo, e);
+            }
+            return null;
+          },
+          unregisterRequestTimeSec,
+          "unregister shuffle server");
+
+    } finally {
+      if (executorService != null) {
+        executorService.shutdownNow();
+      }
+      shuffleServerInfoMap.remove(appId);
+    }
   }
 
   private void throwExceptionIfNecessary(ClientResponse response, String 
errorMsg) {
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
index eb4cd398f..8be3d67cb 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
@@ -28,6 +28,7 @@ import 
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
 import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
 import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
 import org.apache.uniffle.client.response.RssFinishShuffleResponse;
@@ -39,6 +40,7 @@ import 
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssSendCommitResponse;
 import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
 import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
 
 public interface ShuffleServerClient {
@@ -47,6 +49,9 @@ public interface ShuffleServerClient {
 
   RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest 
request);
 
+  RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId(
+      RssUnregisterShuffleByAppIdRequest request);
+
   RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest 
request);
 
   RssSendCommitResponse sendCommit(RssSendCommitRequest request);
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 6bae6f62c..7297aec9d 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -45,6 +45,7 @@ import 
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
 import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
 import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
 import org.apache.uniffle.client.response.RssFinishShuffleResponse;
@@ -56,6 +57,7 @@ import 
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssSendCommitResponse;
 import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
 import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.PartitionRange;
@@ -328,6 +330,37 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
     return result;
   }
 
+  private RssProtos.ShuffleUnregisterByAppIdResponse 
doUnregisterShuffleByAppId(String appId) {
+    RssProtos.ShuffleUnregisterByAppIdRequest request =
+        
RssProtos.ShuffleUnregisterByAppIdRequest.newBuilder().setAppId(appId).build();
+    return blockingStub.unregisterShuffleByAppId(request);
+  }
+
+  @Override
+  public RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId(
+      RssUnregisterShuffleByAppIdRequest request) {
+    RssProtos.ShuffleUnregisterByAppIdResponse rpcResponse =
+        doUnregisterShuffleByAppId(request.getAppId());
+
+    RssUnregisterShuffleByAppIdResponse response;
+    RssProtos.StatusCode statusCode = rpcResponse.getStatus();
+
+    switch (statusCode) {
+      case SUCCESS:
+        response = new RssUnregisterShuffleByAppIdResponse(StatusCode.SUCCESS);
+        break;
+      default:
+        String msg =
+            String.format(
+                "Errors on unregister app to %s:%s for appId[%s], error: %s",
+                host, port, request.getAppId(), rpcResponse.getRetMsg());
+        LOG.error(msg);
+        throw new RssException(msg);
+    }
+
+    return response;
+  }
+
   private RssProtos.ShuffleUnregisterResponse doUnregisterShuffle(String 
appId, int shuffleId) {
     RssProtos.ShuffleUnregisterRequest request =
         RssProtos.ShuffleUnregisterRequest.newBuilder()
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java
new file mode 100644
index 000000000..0992355a5
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+public class RssUnregisterShuffleByAppIdRequest {
+  private String appId;
+
+  public RssUnregisterShuffleByAppIdRequest(String appId) {
+    this.appId = appId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java
new file mode 100644
index 000000000..5c01e84e9
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+
+public class RssUnregisterShuffleByAppIdResponse extends ClientResponse {
+
+  public RssUnregisterShuffleByAppIdResponse(StatusCode statusCode) {
+    super(statusCode);
+  }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 9f601f242..aab38efd4 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -26,6 +26,7 @@ package rss.common;
 service ShuffleServer {
   rpc registerShuffle (ShuffleRegisterRequest) returns 
(ShuffleRegisterResponse);
   rpc unregisterShuffle(ShuffleUnregisterRequest) returns 
(ShuffleUnregisterResponse);
+  rpc unregisterShuffleByAppId(ShuffleUnregisterByAppIdRequest) returns 
(ShuffleUnregisterByAppIdResponse);
   rpc sendShuffleData (SendShuffleDataRequest) returns 
(SendShuffleDataResponse);
   rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns 
(GetLocalShuffleIndexResponse);
   rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns 
(GetLocalShuffleDataResponse);
@@ -197,6 +198,15 @@ message ShuffleRegisterResponse {
   string retMsg = 2;
 }
 
+message ShuffleUnregisterByAppIdRequest {
+  string appId = 1;
+}
+
+message ShuffleUnregisterByAppIdResponse {
+  StatusCode status = 1;
+  string retMsg = 2;
+}
+
 message SendShuffleDataRequest {
   string appId = 1;
   int32 shuffleId = 2;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 65c2b998b..ac9b95c34 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -99,6 +99,30 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     this.shuffleServer = shuffleServer;
   }
 
+  @Override
+  public void unregisterShuffleByAppId(
+      RssProtos.ShuffleUnregisterByAppIdRequest request,
+      StreamObserver<RssProtos.ShuffleUnregisterByAppIdResponse> 
responseStreamObserver) {
+    String appId = request.getAppId();
+
+    StatusCode result = StatusCode.SUCCESS;
+    String responseMessage = "OK";
+    try {
+      shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId);
+
+    } catch (Exception e) {
+      result = StatusCode.INTERNAL_ERROR;
+    }
+
+    RssProtos.ShuffleUnregisterByAppIdResponse reply =
+        RssProtos.ShuffleUnregisterByAppIdResponse.newBuilder()
+            .setStatus(result.toProto())
+            .setRetMsg(responseMessage)
+            .build();
+    responseStreamObserver.onNext(reply);
+    responseStreamObserver.onCompleted();
+  }
+
   @Override
   public void unregisterShuffle(
       RssProtos.ShuffleUnregisterRequest request,
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 04d62dabe..7977b8016 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -70,6 +70,7 @@ import 
org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.AppUnregisterPurgeEvent;
 import org.apache.uniffle.server.event.PurgeEvent;
 import org.apache.uniffle.server.event.ShufflePurgeEvent;
 import org.apache.uniffle.server.storage.StorageManager;
@@ -183,6 +184,12 @@ public class ShuffleTaskManager {
                     (System.currentTimeMillis() - startTime) / 
Constants.MILLION_SECONDS_PER_SECOND;
                 
ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime);
               }
+              if (event instanceof AppUnregisterPurgeEvent) {
+                removeResources(event.getAppId(), false);
+                double usedTime =
+                    (System.currentTimeMillis() - startTime) / 
Constants.MILLION_SECONDS_PER_SECOND;
+                
ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime);
+              }
               if (event instanceof ShufflePurgeEvent) {
                 removeResourcesByShuffleIds(event.getAppId(), 
event.getShuffleIds());
                 double usedTime =
@@ -842,6 +849,10 @@ public class ShuffleTaskManager {
         new ShufflePurgeEvent(appId, getUserByAppId(appId), 
Arrays.asList(shuffleId)));
   }
 
+  public void removeShuffleDataAsync(String appId) {
+    expiredAppIdQueue.add(new AppUnregisterPurgeEvent(appId, 
getUserByAppId(appId)));
+  }
+
   @VisibleForTesting
   void removeShuffleDataSync(String appId, int shuffleId) {
     removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
diff --git 
a/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java
 
b/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java
new file mode 100644
index 000000000..04d6318ce
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+public class AppUnregisterPurgeEvent extends PurgeEvent {
+  public AppUnregisterPurgeEvent(String appId, String user) {
+    super(appId, user, null);
+  }
+}

Reply via email to