This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9471cba43 [#1476] feat(spark): Provide dedicated unregister app rpc
interface (#1510)
9471cba43 is described below
commit 9471cba436d0cec604a780faa01da5d1341fef5a
Author: Yiqi You <[email protected]>
AuthorDate: Thu Feb 8 09:26:28 2024 +0800
[#1476] feat(spark): Provide dedicated unregister app rpc interface (#1510)
### What changes were proposed in this pull request?
Introduce dedicated unregisterApp rpc interface which is only called once
when unregister shuffle
### Why are the changes needed?
Fix: [#1476](https://github.com/apache/incubator-uniffle/issues/1476)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
---
.../client/impl/ShuffleWriteClientImpl.java | 40 +++++++++++++++++++++-
.../uniffle/client/api/ShuffleServerClient.java | 5 +++
.../client/impl/grpc/ShuffleServerGrpcClient.java | 33 ++++++++++++++++++
.../RssUnregisterShuffleByAppIdRequest.java | 30 ++++++++++++++++
.../RssUnregisterShuffleByAppIdResponse.java | 27 +++++++++++++++
proto/src/main/proto/Rss.proto | 10 ++++++
.../uniffle/server/ShuffleServerGrpcService.java | 24 +++++++++++++
.../apache/uniffle/server/ShuffleTaskManager.java | 11 ++++++
.../server/event/AppUnregisterPurgeEvent.java | 24 +++++++++++++
9 files changed, 203 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index faa1f247c..337869d22 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -60,6 +60,7 @@ import
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.ClientResponse;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
@@ -73,6 +74,7 @@ import
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.client.util.ClientUtils;
@@ -984,6 +986,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
@Override
public void unregisterShuffle(String appId) {
+ RssUnregisterShuffleByAppIdRequest request = new
RssUnregisterShuffleByAppIdRequest(appId);
+
if (appId == null) {
return;
}
@@ -991,7 +995,41 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
if (appServerMap == null) {
return;
}
- appServerMap.keySet().forEach(shuffleId -> unregisterShuffle(appId,
shuffleId));
+ Set<ShuffleServerInfo> shuffleServerInfos = getAllShuffleServers(appId);
+
+ ExecutorService executorService = null;
+ try {
+ executorService =
+ ThreadUtils.getDaemonFixedThreadPool(
+ Math.min(unregisterThreadPoolSize, shuffleServerInfos.size()),
"unregister-shuffle");
+
+ ThreadUtils.executeTasks(
+ executorService,
+ shuffleServerInfos,
+ shuffleServerInfo -> {
+ try {
+ ShuffleServerClient client =
+ ShuffleServerClientFactory.getInstance()
+ .getShuffleServerClient(clientType, shuffleServerInfo,
rssConf);
+ RssUnregisterShuffleByAppIdResponse response =
+ client.unregisterShuffleByAppId(request);
+ if (response.getStatusCode() != StatusCode.SUCCESS) {
+ LOG.warn("Failed to unregister shuffle to " +
shuffleServerInfo);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error happened when unregistering to " +
shuffleServerInfo, e);
+ }
+ return null;
+ },
+ unregisterRequestTimeSec,
+ "unregister shuffle server");
+
+ } finally {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ shuffleServerInfoMap.remove(appId);
+ }
}
private void throwExceptionIfNecessary(ClientResponse response, String
errorMsg) {
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
index eb4cd398f..8be3d67cb 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
@@ -28,6 +28,7 @@ import
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssFinishShuffleResponse;
@@ -39,6 +40,7 @@ import
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
public interface ShuffleServerClient {
@@ -47,6 +49,9 @@ public interface ShuffleServerClient {
RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest
request);
+ RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId(
+ RssUnregisterShuffleByAppIdRequest request);
+
RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest
request);
RssSendCommitResponse sendCommit(RssSendCommitRequest request);
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 6bae6f62c..7297aec9d 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -45,6 +45,7 @@ import
org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssFinishShuffleResponse;
@@ -56,6 +57,7 @@ import
org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
@@ -328,6 +330,37 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
return result;
}
+ private RssProtos.ShuffleUnregisterByAppIdResponse
doUnregisterShuffleByAppId(String appId) {
+ RssProtos.ShuffleUnregisterByAppIdRequest request =
+
RssProtos.ShuffleUnregisterByAppIdRequest.newBuilder().setAppId(appId).build();
+ return blockingStub.unregisterShuffleByAppId(request);
+ }
+
+ @Override
+ public RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId(
+ RssUnregisterShuffleByAppIdRequest request) {
+ RssProtos.ShuffleUnregisterByAppIdResponse rpcResponse =
+ doUnregisterShuffleByAppId(request.getAppId());
+
+ RssUnregisterShuffleByAppIdResponse response;
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
+
+ switch (statusCode) {
+ case SUCCESS:
+ response = new RssUnregisterShuffleByAppIdResponse(StatusCode.SUCCESS);
+ break;
+ default:
+ String msg =
+ String.format(
+ "Errors on unregister app to %s:%s for appId[%s], error: %s",
+ host, port, request.getAppId(), rpcResponse.getRetMsg());
+ LOG.error(msg);
+ throw new RssException(msg);
+ }
+
+ return response;
+ }
+
private RssProtos.ShuffleUnregisterResponse doUnregisterShuffle(String
appId, int shuffleId) {
RssProtos.ShuffleUnregisterRequest request =
RssProtos.ShuffleUnregisterRequest.newBuilder()
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java
new file mode 100644
index 000000000..0992355a5
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+public class RssUnregisterShuffleByAppIdRequest {
+ private String appId;
+
+ public RssUnregisterShuffleByAppIdRequest(String appId) {
+ this.appId = appId;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java
new file mode 100644
index 000000000..5c01e84e9
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+
+public class RssUnregisterShuffleByAppIdResponse extends ClientResponse {
+
+ public RssUnregisterShuffleByAppIdResponse(StatusCode statusCode) {
+ super(statusCode);
+ }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 9f601f242..aab38efd4 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -26,6 +26,7 @@ package rss.common;
service ShuffleServer {
rpc registerShuffle (ShuffleRegisterRequest) returns
(ShuffleRegisterResponse);
rpc unregisterShuffle(ShuffleUnregisterRequest) returns
(ShuffleUnregisterResponse);
+ rpc unregisterShuffleByAppId(ShuffleUnregisterByAppIdRequest) returns
(ShuffleUnregisterByAppIdResponse);
rpc sendShuffleData (SendShuffleDataRequest) returns
(SendShuffleDataResponse);
rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns
(GetLocalShuffleIndexResponse);
rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns
(GetLocalShuffleDataResponse);
@@ -197,6 +198,15 @@ message ShuffleRegisterResponse {
string retMsg = 2;
}
+message ShuffleUnregisterByAppIdRequest {
+ string appId = 1;
+}
+
+message ShuffleUnregisterByAppIdResponse {
+ StatusCode status = 1;
+ string retMsg = 2;
+}
+
message SendShuffleDataRequest {
string appId = 1;
int32 shuffleId = 2;
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 65c2b998b..ac9b95c34 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -99,6 +99,30 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
this.shuffleServer = shuffleServer;
}
+ @Override
+ public void unregisterShuffleByAppId(
+ RssProtos.ShuffleUnregisterByAppIdRequest request,
+ StreamObserver<RssProtos.ShuffleUnregisterByAppIdResponse>
responseStreamObserver) {
+ String appId = request.getAppId();
+
+ StatusCode result = StatusCode.SUCCESS;
+ String responseMessage = "OK";
+ try {
+ shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId);
+
+ } catch (Exception e) {
+ result = StatusCode.INTERNAL_ERROR;
+ }
+
+ RssProtos.ShuffleUnregisterByAppIdResponse reply =
+ RssProtos.ShuffleUnregisterByAppIdResponse.newBuilder()
+ .setStatus(result.toProto())
+ .setRetMsg(responseMessage)
+ .build();
+ responseStreamObserver.onNext(reply);
+ responseStreamObserver.onCompleted();
+ }
+
@Override
public void unregisterShuffle(
RssProtos.ShuffleUnregisterRequest request,
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 04d62dabe..7977b8016 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -70,6 +70,7 @@ import
org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.AppUnregisterPurgeEvent;
import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.server.storage.StorageManager;
@@ -183,6 +184,12 @@ public class ShuffleTaskManager {
(System.currentTimeMillis() - startTime) /
Constants.MILLION_SECONDS_PER_SECOND;
ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime);
}
+ if (event instanceof AppUnregisterPurgeEvent) {
+ removeResources(event.getAppId(), false);
+ double usedTime =
+ (System.currentTimeMillis() - startTime) /
Constants.MILLION_SECONDS_PER_SECOND;
+
ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime);
+ }
if (event instanceof ShufflePurgeEvent) {
removeResourcesByShuffleIds(event.getAppId(),
event.getShuffleIds());
double usedTime =
@@ -842,6 +849,10 @@ public class ShuffleTaskManager {
new ShufflePurgeEvent(appId, getUserByAppId(appId),
Arrays.asList(shuffleId)));
}
+ public void removeShuffleDataAsync(String appId) {
+ expiredAppIdQueue.add(new AppUnregisterPurgeEvent(appId,
getUserByAppId(appId)));
+ }
+
@VisibleForTesting
void removeShuffleDataSync(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
diff --git
a/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java
b/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java
new file mode 100644
index 000000000..04d6318ce
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+public class AppUnregisterPurgeEvent extends PurgeEvent {
+ public AppUnregisterPurgeEvent(String appId, String user) {
+ super(appId, user, null);
+ }
+}