This is an automated email from the ASF dual-hosted git repository.

maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new eed5ebaf6 [MINOR] fix(server): Server heartbeat to coordinator while 
unregister shuffle (#2132)
eed5ebaf6 is described below

commit eed5ebaf60ca9d0f2aa6205c031272ed36789d67
Author: maobaolong <[email protected]>
AuthorDate: Tue Jan 7 14:23:28 2025 +0800

    [MINOR] fix(server): Server heartbeat to coordinator while unregister 
shuffle (#2132)
    
    <!--
    1. Title: [#<issue>] <type>(<scope>): <subject>
       Examples:
         - "[#123] feat(operator): support xxx"
         - "[#233] fix: check null before access result in xxx"
         - "[MINOR] refactor: fix typo in variable name"
         - "[MINOR] docs: fix typo in README"
         - "[#255] test: fix flaky test NameOfTheTest"
       Reference: https://www.conventionalcommits.org/en/v1.0.0/
    2. Contributor guidelines:
       https://github.com/apache/incubator-uniffle/blob/master/CONTRIBUTING.md
    3. If the PR is unfinished, please mark this PR as draft.
    -->
    
    ### What changes were proposed in this pull request?
    
    Server send heartbeat to coordinator while receive unregister shuffle 
request.
    
    ### Why are the changes needed?
    
    Without this PR, server could not heartbeat the updated app info after 
unregister this app, so the coordinator and dashboard could display the outdate 
information.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tested by a tiny spark job executed by spark-shell
    
    - shell command line
    
    ```shell
    bin/spark-shell  --master  spark://localhost:7077  --deploy-mode client 
--conf spark.rss.coordinator.quorum=localhost:19999   --conf 
spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager  --conf 
spark.rss.storage.type=LOCALFILE --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.rss.test.mode.enable=true --conf spark.rss.client.type=GRPC_NETTY --conf 
spark.default.parallelism=16  -i test.scala
    ```
    
    - test.scala
    ```scala
    val data = sc.parallelize(Seq(("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B", 
5), ("A", 6), ("A", 7),("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 
7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7)));
    val result = data.reduceByKey(_ + _);
    result.collect().foreach(println);
    System.exit(0);
    ```
    
    <img width="2540" alt="image" 
src="https://github.com/user-attachments/assets/bbd7c534-9fea-4e86-ba5e-84dff3d9c98c";>
---
 .../java/org/apache/uniffle/server/ShuffleServerConf.java     |  6 ++++++
 .../org/apache/uniffle/server/ShuffleServerGrpcService.java   | 11 +++++++++++
 2 files changed, 17 insertions(+)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index b5951c73d..d197b8286 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -766,6 +766,12 @@ public class ShuffleServerConf extends RssBaseConf {
           .defaultValue(LocalStorageManager.class.getName())
           .withDescription("The class of local storage manager 
implementation");
 
+  public static final ConfigOption<Boolean> 
SERVER_HEARTBEAT_REPORT_ON_UNREGISTER_ENABLED =
+      ConfigOptions.key("rss.server.heartbeatReportOnUnregisterEnabled")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription("Whether to trigger report while unregister");
+
   public ShuffleServerConf() {}
 
   public ShuffleServerConf(String fileName) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 9c32e8621..356a58d84 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -115,6 +115,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   private final ShuffleServer shuffleServer;
   private boolean isRpcAuditLogEnabled;
   private List<String> rpcAuditExcludeOpList;
+  private boolean reportOnUnregisterEnabled;
 
   public ShuffleServerGrpcService(ShuffleServer shuffleServer) {
     this.shuffleServer = shuffleServer;
@@ -128,6 +129,10 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
             .getShuffleServerConf()
             
.getReconfigurableConf(ShuffleServerConf.SERVER_RPC_RPC_AUDIT_LOG_EXCLUDE_LIST)
             .get();
+    reportOnUnregisterEnabled =
+        shuffleServer
+            .getShuffleServerConf()
+            
.getBoolean(ShuffleServerConf.SERVER_HEARTBEAT_REPORT_ON_UNREGISTER_ENABLED);
     ReconfigurableRegistry.register(
         Sets.newHashSet(
             ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED.key(),
@@ -168,6 +173,9 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       }
       String responseMessage = "OK";
       try {
+        if (reportOnUnregisterEnabled) {
+          shuffleServer.sendHeartbeat();
+        }
         shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId);
         if (shuffleServer.isRemoteMergeEnable()) {
           shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId + 
MERGE_APP_SUFFIX);
@@ -210,6 +218,9 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       }
       String responseMessage = "OK";
       try {
+        if (reportOnUnregisterEnabled) {
+          shuffleServer.sendHeartbeat();
+        }
         shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId, 
shuffleId);
         if (shuffleServer.isRemoteMergeEnable()) {
           shuffleServer

Reply via email to