This is an automated email from the ASF dual-hosted git repository.
maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new eed5ebaf6 [MINOR] fix(server): Server heartbeat to coordinator while
unregister shuffle (#2132)
eed5ebaf6 is described below
commit eed5ebaf60ca9d0f2aa6205c031272ed36789d67
Author: maobaolong <[email protected]>
AuthorDate: Tue Jan 7 14:23:28 2025 +0800
[MINOR] fix(server): Server heartbeat to coordinator while unregister
shuffle (#2132)
<!--
1. Title: [#<issue>] <type>(<scope>): <subject>
Examples:
- "[#123] feat(operator): support xxx"
- "[#233] fix: check null before access result in xxx"
- "[MINOR] refactor: fix typo in variable name"
- "[MINOR] docs: fix typo in README"
- "[#255] test: fix flaky test NameOfTheTest"
Reference: https://www.conventionalcommits.org/en/v1.0.0/
2. Contributor guidelines:
https://github.com/apache/incubator-uniffle/blob/master/CONTRIBUTING.md
3. If the PR is unfinished, please mark this PR as draft.
-->
### What changes were proposed in this pull request?
Server send heartbeat to coordinator while receive unregister shuffle
request.
### Why are the changes needed?
Without this PR, server could not heartbeat the updated app info after
unregister this app, so the coordinator and dashboard could display the outdate
information.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested by a tiny spark job executed by spark-shell
- shell command line
```shell
bin/spark-shell --master spark://localhost:7077 --deploy-mode client
--conf spark.rss.coordinator.quorum=localhost:19999 --conf
spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager --conf
spark.rss.storage.type=LOCALFILE --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
spark.rss.test.mode.enable=true --conf spark.rss.client.type=GRPC_NETTY --conf
spark.default.parallelism=16 -i test.scala
```
- test.scala
```scala
val data = sc.parallelize(Seq(("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B",
5), ("A", 6), ("A", 7),("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A",
7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7)));
val result = data.reduceByKey(_ + _);
result.collect().foreach(println);
System.exit(0);
```
<img width="2540" alt="image"
src="https://github.com/user-attachments/assets/bbd7c534-9fea-4e86-ba5e-84dff3d9c98c">
---
.../java/org/apache/uniffle/server/ShuffleServerConf.java | 6 ++++++
.../org/apache/uniffle/server/ShuffleServerGrpcService.java | 11 +++++++++++
2 files changed, 17 insertions(+)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index b5951c73d..d197b8286 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -766,6 +766,12 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(LocalStorageManager.class.getName())
.withDescription("The class of local storage manager
implementation");
+ public static final ConfigOption<Boolean>
SERVER_HEARTBEAT_REPORT_ON_UNREGISTER_ENABLED =
+ ConfigOptions.key("rss.server.heartbeatReportOnUnregisterEnabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to trigger report while unregister");
+
public ShuffleServerConf() {}
public ShuffleServerConf(String fileName) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 9c32e8621..356a58d84 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -115,6 +115,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
private final ShuffleServer shuffleServer;
private boolean isRpcAuditLogEnabled;
private List<String> rpcAuditExcludeOpList;
+ private boolean reportOnUnregisterEnabled;
public ShuffleServerGrpcService(ShuffleServer shuffleServer) {
this.shuffleServer = shuffleServer;
@@ -128,6 +129,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.getShuffleServerConf()
.getReconfigurableConf(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST)
.get();
+ reportOnUnregisterEnabled =
+ shuffleServer
+ .getShuffleServerConf()
+
.getBoolean(ShuffleServerConf.SERVER_HEARTBEAT_REPORT_ON_UNREGISTER_ENABLED);
ReconfigurableRegistry.register(
Sets.newHashSet(
ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key(),
@@ -168,6 +173,9 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
String responseMessage = "OK";
try {
+ if (reportOnUnregisterEnabled) {
+ shuffleServer.sendHeartbeat();
+ }
shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId);
if (shuffleServer.isRemoteMergeEnable()) {
shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId +
MERGE_APP_SUFFIX);
@@ -210,6 +218,9 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
String responseMessage = "OK";
try {
+ if (reportOnUnregisterEnabled) {
+ shuffleServer.sendHeartbeat();
+ }
shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId,
shuffleId);
if (shuffleServer.isRemoteMergeEnable()) {
shuffleServer