This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new fe1f30469 [MINOR] improve(server): Add return value to server rpc 
audit log (#2007)
fe1f30469 is described below

commit fe1f30469de74f6e7e36c7a39e45c47f1396d79e
Author: maobaolong <[email protected]>
AuthorDate: Mon Aug 12 10:21:46 2024 +0800

    [MINOR] improve(server): Add return value to server rpc audit log (#2007)
    
    ### What changes were proposed in this pull request?
    
    Add return value for server rpc audit log
    
    ### Why are the changes needed?
    
    With the return value, we can know clear about the effect of the rpc call.
    
    E.g. A bundle of `requireBuffer` calls appear in the rpc audit log, then 
followed some `sendShuffleData` rpc call with `requireBufferId`, but we don't 
know which `requireBuffer` call match the sendShuffleData call.
    
    With this PR, we record the requireBufferId also which generated by 
`requireBuffer` call, and search the sendShuffleData call with the 
`requireBufferId`, this can be possible.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Test Locally.
---
 .../coordinator/CoordinatorGrpcService.java        |  46 ++++----
 .../audit/CoordinatorRPCAuditContext.java          |  16 +--
 .../uniffle/server/ShuffleServerGrpcService.java   | 118 +++++++++++----------
 .../server/audit/ServerRPCAuditContext.java        |  27 +++--
 .../server/netty/ShuffleServerNettyHandler.java    |  64 ++++++-----
 5 files changed, 148 insertions(+), 123 deletions(-)

diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index c26f8627d..d618211e0 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -135,8 +135,8 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
       final Set<String> faultyServerIds = new 
HashSet<>(request.getFaultyServerIdsList());
 
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs(
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs(
           String.format(
               "partitionNum=%d, partitionNumPerRange=%d, replica=%d, 
requiredTags=%s, "
                   + "requiredShuffleServerNumber=%d, faultyServerIds=%s, 
stageId=%d, stageAttemptNumber=%d, isReassign=%b",
@@ -205,7 +205,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
         responseObserver.onNext(response);
       } finally {
         if (response != null) {
-          auditContext.setStatusCode(response.getStatus());
+          auditContext.withStatusCode(response.getStatus());
         }
         responseObserver.onCompleted();
       }
@@ -218,7 +218,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       StreamObserver<ShuffleServerHeartBeatResponse> responseObserver) {
     try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("heartbeat")) {
       final ServerNode serverNode = toServerNode(request);
-      auditContext.setArgs("serverNode=" + serverNode.getId());
+      auditContext.withArgs("serverNode=" + serverNode.getId());
       coordinatorServer.getClusterManager().add(serverNode);
       final ShuffleServerHeartBeatResponse response =
           ShuffleServerHeartBeatResponse.newBuilder()
@@ -228,7 +228,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       if (LOG.isDebugEnabled()) {
         LOG.debug("Got heartbeat from {}", serverNode);
       }
-      auditContext.setStatusCode(response.getStatus());
+      auditContext.withStatusCode(response.getStatus());
       responseObserver.onNext(response);
       responseObserver.onCompleted();
     }
@@ -242,7 +242,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
           CheckServiceAvailableResponse.newBuilder()
               
.setAvailable(coordinatorServer.getClusterManager().getNodesNum() > 0)
               .build();
-      auditContext.setStatusCode(StatusCode.SUCCESS);
+      auditContext.withStatusCode(StatusCode.SUCCESS);
       responseObserver.onNext(response);
       responseObserver.onCompleted();
     }
@@ -257,7 +257,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       final int clientPort = request.getClientPort();
       final ShuffleServerId shuffleServer = request.getServer();
       final String operation = request.getOperation();
-      auditContext.setArgs(
+      auditContext.withArgs(
           String.format("%s:%s->%s->%s", clientHost, clientPort, operation, 
shuffleServer));
 
       LOG.info(clientHost + ":" + clientPort + "->" + operation + "->" + 
shuffleServer);
@@ -266,7 +266,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
               .setRetMsg("")
               .setStatus(StatusCode.SUCCESS)
               .build();
-      auditContext.setStatusCode(response.getStatus());
+      auditContext.withStatusCode(response.getStatus());
       responseObserver.onNext(response);
       responseObserver.onCompleted();
     }
@@ -277,7 +277,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse> 
responseObserver) {
     try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("appHeartbeat")) {
       String appId = request.getAppId();
-      auditContext.setAppId(appId);
+      auditContext.withAppId(appId);
       coordinatorServer.getApplicationManager().refreshAppId(appId);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Got heartbeat from application: {}", appId);
@@ -288,12 +288,12 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       if (Context.current().isCancelled()) {
         responseObserver.onError(
             Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
-        auditContext.setStatusCode("CANCELLED");
+        auditContext.withStatusCode("CANCELLED");
         LOG.warn("Cancelled by client {} for after deadline.", appId);
         return;
       }
 
-      auditContext.setStatusCode(response.getStatus());
+      auditContext.withStatusCode(response.getStatus());
       responseObserver.onNext(response);
       responseObserver.onCompleted();
     }
@@ -305,7 +305,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
     try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("registerApplicationInfo")) {
       String appId = request.getAppId();
       String user = request.getUser();
-      auditContext.setAppId(appId).setArgs("user=" + user);
+      auditContext.withAppId(appId).withArgs("user=" + user);
       coordinatorServer
           .getApplicationManager()
           .registerApplicationInfo(appId, user, request.getVersion(), 
request.getGitCommitId());
@@ -318,12 +318,12 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       if (Context.current().isCancelled()) {
         responseObserver.onError(
             Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
-        auditContext.setStatusCode("CANCELLED");
+        auditContext.withStatusCode("CANCELLED");
         LOG.warn("Cancelled by client {} for after deadline.", appId);
         return;
       }
 
-      auditContext.setStatusCode(response.getStatus());
+      auditContext.withStatusCode(response.getStatus());
       responseObserver.onNext(response);
       responseObserver.onCompleted();
     }
@@ -344,7 +344,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
               request.getExtraPropertiesMap(),
               request.getUser());
 
-      auditContext.setArgs("accessInfo=" + accessInfo);
+      auditContext.withArgs("accessInfo=" + accessInfo);
 
       AccessCheckResult result = accessManager.handleAccessRequest(accessInfo);
       if (!result.isSuccess()) {
@@ -361,12 +361,12 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       if (Context.current().isCancelled()) {
         responseObserver.onError(
             Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
-        auditContext.setStatusCode("CANCELLED");
+        auditContext.withStatusCode("CANCELLED");
         LOG.warn("Cancelled by client {} for after deadline.", accessInfo);
         return;
       }
 
-      auditContext.setStatusCode(response.getStatus());
+      auditContext.withStatusCode(response.getStatus());
       responseObserver.onNext(response);
       responseObserver.onCompleted();
     }
@@ -378,7 +378,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       Empty empty, StreamObserver<FetchClientConfResponse> responseObserver) {
     try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("fetchClientConf")) {
       fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO, 
responseObserver);
-      auditContext.setStatusCode(StatusCode.SUCCESS);
+      auditContext.withStatusCode(StatusCode.SUCCESS);
     }
   }
 
@@ -387,7 +387,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       FetchClientConfRequest request, StreamObserver<FetchClientConfResponse> 
responseObserver) {
     try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("fetchClientConfV2")) {
       fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request), 
responseObserver);
-      auditContext.setStatusCode(StatusCode.SUCCESS);
+      auditContext.withStatusCode(StatusCode.SUCCESS);
     }
   }
 
@@ -430,7 +430,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       FetchRemoteStorageResponse response;
       StatusCode status = StatusCode.SUCCESS;
       String appId = request.getAppId();
-      auditContext.setAppId(appId);
+      auditContext.withAppId(appId);
       try {
         RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder();
         RemoteStorageInfo rsInfo =
@@ -461,12 +461,12 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
       if (Context.current().isCancelled()) {
         responseObserver.onError(
             Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
-        auditContext.setStatusCode("CANCELLED");
+        auditContext.withStatusCode("CANCELLED");
         LOG.warn("Fetch client conf cancelled by client for after deadline.");
         return;
       }
 
-      auditContext.setStatusCode(response.getStatus());
+      auditContext.withStatusCode(response.getStatus());
       responseObserver.onNext(response);
       responseObserver.onCompleted();
     }
@@ -536,7 +536,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
     }
     CoordinatorRPCAuditContext auditContext = new 
CoordinatorRPCAuditContext(auditLogger);
     if (auditLogger != null) {
-      auditContext.setCommand(command).setCreationTimeNs(System.nanoTime());
+      auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
     }
     return auditContext;
   }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
index 7a6453bfb..a66bd2805 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
@@ -48,7 +48,7 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
    * @param command the command associated with shuffle server rpc
    * @return this {@link AuditContext} instance
    */
-  public CoordinatorRPCAuditContext setCommand(String command) {
+  public CoordinatorRPCAuditContext withCommand(String command) {
     this.command = command;
     return this;
   }
@@ -60,7 +60,7 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
    *     compute operation mExecutionTime
    * @return this {@link AuditContext} instance
    */
-  public CoordinatorRPCAuditContext setCreationTimeNs(long creationTimeNs) {
+  public CoordinatorRPCAuditContext withCreationTimeNs(long creationTimeNs) {
     this.creationTimeNs = creationTimeNs;
     return this;
   }
@@ -71,7 +71,7 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
    * @param statusCode the status code
    * @return this {@link AuditContext} instance
    */
-  public CoordinatorRPCAuditContext setStatusCode(StatusCode statusCode) {
+  public CoordinatorRPCAuditContext withStatusCode(StatusCode statusCode) {
     if (statusCode == null) {
       this.statusCode = "UNKNOWN";
     } else {
@@ -86,7 +86,7 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
    * @param statusCode the status code
    * @return this {@link AuditContext} instance
    */
-  public CoordinatorRPCAuditContext setStatusCode(
+  public CoordinatorRPCAuditContext withStatusCode(
       org.apache.uniffle.proto.RssProtos.StatusCode statusCode) {
     if (statusCode == null) {
       this.statusCode = "UNKNOWN";
@@ -102,7 +102,7 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
    * @param statusCode the status code
    * @return this {@link AuditContext} instance
    */
-  public CoordinatorRPCAuditContext setStatusCode(String statusCode) {
+  public CoordinatorRPCAuditContext withStatusCode(String statusCode) {
     this.statusCode = statusCode;
     return this;
   }
@@ -128,17 +128,17 @@ public class CoordinatorRPCAuditContext implements 
AuditContext {
     return line;
   }
 
-  public CoordinatorRPCAuditContext setAppId(String appId) {
+  public CoordinatorRPCAuditContext withAppId(String appId) {
     this.appId = appId;
     return this;
   }
 
-  public CoordinatorRPCAuditContext setShuffleId(int shuffleId) {
+  public CoordinatorRPCAuditContext withShuffleId(int shuffleId) {
     this.shuffleId = shuffleId;
     return this;
   }
 
-  public CoordinatorRPCAuditContext setArgs(String args) {
+  public CoordinatorRPCAuditContext withArgs(String args) {
     this.args = args;
     return this;
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 249e7c376..b0fe9d9ef 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -127,10 +127,10 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       StreamObserver<RssProtos.ShuffleUnregisterByAppIdResponse> 
responseStreamObserver) {
     try (ServerRPCAuditContext auditContext = 
createAuditContext("unregisterShuffleByAppId")) {
       String appId = request.getAppId();
-      auditContext.setAppId(appId);
+      auditContext.withAppId(appId);
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         RssProtos.ShuffleUnregisterByAppIdResponse reply =
             RssProtos.ShuffleUnregisterByAppIdResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -148,7 +148,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         status = StatusCode.INTERNAL_ERROR;
       }
 
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
       RssProtos.ShuffleUnregisterByAppIdResponse reply =
           RssProtos.ShuffleUnregisterByAppIdResponse.newBuilder()
               .setStatus(status.toProto())
@@ -166,10 +166,10 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     try (ServerRPCAuditContext auditContext = 
createAuditContext("unregisterShuffle")) {
       String appId = request.getAppId();
       int shuffleId = request.getShuffleId();
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         RssProtos.ShuffleUnregisterResponse reply =
             RssProtos.ShuffleUnregisterResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -186,7 +186,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         status = StatusCode.INTERNAL_ERROR;
       }
 
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
       RssProtos.ShuffleUnregisterResponse reply =
           RssProtos.ShuffleUnregisterResponse.newBuilder()
               .setStatus(status.toProto())
@@ -207,8 +207,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       String remoteStoragePath = req.getRemoteStorage().getPath();
       String user = req.getUser();
       int stageAttemptNumber = req.getStageAttemptNumber();
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs(
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs(
           "remoteStoragePath="
               + remoteStoragePath
               + ", user="
@@ -240,7 +240,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                   shuffleId,
                   e);
               StatusCode code = StatusCode.INTERNAL_ERROR;
-              auditContext.setStatusCode(code);
+              auditContext.withStatusCode(code);
               reply = 
ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
               responseObserver.onNext(reply);
               responseObserver.onCompleted();
@@ -250,7 +250,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
             // When a Stage retry occurs, the first or last registration of a 
Stage may need to be
             // ignored and the ignored status quickly returned.
             StatusCode code = StatusCode.STAGE_RETRY_IGNORE;
-            auditContext.setStatusCode(code);
+            auditContext.withStatusCode(code);
             reply = 
ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
             responseObserver.onNext(reply);
             responseObserver.onCompleted();
@@ -296,7 +296,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                   user,
                   shuffleDataDistributionType,
                   maxConcurrencyPerPartitionToWrite);
-      auditContext.setStatusCode(result);
+      auditContext.withStatusCode(result);
       reply = 
ShuffleRegisterResponse.newBuilder().setStatus(result.toProto()).build();
       responseObserver.onNext(reply);
       responseObserver.onCompleted();
@@ -314,8 +314,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       long timestamp = req.getTimestamp();
       int stageAttemptNumber = req.getStageAttemptNumber();
 
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs(
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs(
           "requireBufferId="
               + requireBufferId
               + ", timestamp="
@@ -342,7 +342,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 .setStatus(StatusCode.APP_NOT_FOUND.toProto())
                 .setRetMsg(errorMsg)
                 .build();
-        auditContext.setStatusCode(StatusCode.fromProto(reply.getStatus()));
+        auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
         responseObserver.onNext(reply);
         responseObserver.onCompleted();
         return;
@@ -357,7 +357,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 .setStatus(StatusCode.STAGE_RETRY_IGNORE.toProto())
                 .setRetMsg(responseMessage)
                 .build();
-        auditContext.setStatusCode(StatusCode.fromProto(reply.getStatus()));
+        auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
         responseObserver.onNext(reply);
         responseObserver.onCompleted();
         return;
@@ -403,7 +403,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                   .setStatus(StatusCode.INTERNAL_ERROR.toProto())
                   .setRetMsg(responseMessage)
                   .build();
-          auditContext.setStatusCode(StatusCode.fromProto(reply.getStatus()));
+          auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
           responseObserver.onNext(reply);
           responseObserver.onCompleted();
           return;
@@ -510,7 +510,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 .build();
       }
 
-      auditContext.setStatusCode(StatusCode.fromProto(reply.getStatus()));
+      auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
       responseObserver.onNext(reply);
       responseObserver.onCompleted();
     }
@@ -522,10 +522,10 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     try (ServerRPCAuditContext auditContext = 
createAuditContext("commitShuffleTask")) {
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         ShuffleCommitResponse response =
             ShuffleCommitResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -544,6 +544,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         }
         commitCount =
             
shuffleServer.getShuffleTaskManager().updateAndGetCommitCount(appId, shuffleId);
+        auditContext.withReturnValue("commitCount=" + commitCount);
         if (LOG.isDebugEnabled()) {
           LOG.debug(
               "Get commitShuffleTask request for appId["
@@ -560,7 +561,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         LOG.error(msg, e);
       }
 
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
       ShuffleCommitResponse reply =
           ShuffleCommitResponse.newBuilder()
               .setCommitCount(commitCount)
@@ -578,10 +579,10 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     try (ServerRPCAuditContext auditContext = 
createAuditContext("finishShuffle")) {
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         FinishShuffleResponse response =
             FinishShuffleResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -612,7 +613,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         LOG.error(errorMsg, e);
       }
 
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
       FinishShuffleResponse response =
           
FinishShuffleResponse.newBuilder().setStatus(status.toProto()).setRetMsg(msg).build();
       responseObserver.onNext(response);
@@ -625,15 +626,15 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       RequireBufferRequest request, StreamObserver<RequireBufferResponse> 
responseObserver) {
     try (ServerRPCAuditContext auditContext = 
createAuditContext("requireBuffer")) {
       String appId = request.getAppId();
-      auditContext.setAppId(appId).setShuffleId(request.getShuffleId());
+      auditContext.withAppId(appId).withShuffleId(request.getShuffleId());
       String auditArgs = "requireSize=" + request.getRequireSize();
       if (request.getPartitionIdsList() != null) {
         auditArgs += ", partitionIdsSize=" + 
request.getPartitionIdsList().size();
       }
-      auditContext.setArgs(auditArgs);
+      auditContext.withArgs(auditArgs);
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         RequireBufferResponse response =
             RequireBufferResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -687,7 +688,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 + e.getMessage();
         LOG.error(responseMessage);
       }
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
+      auditContext.withReturnValue("requireBufferId=" + requireBufferId);
       RequireBufferResponse response =
           RequireBufferResponse.newBuilder()
               .setStatus(status.toProto())
@@ -704,10 +706,10 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse> 
responseObserver) {
     try (ServerRPCAuditContext auditContext = 
createAuditContext("appHeartbeat")) {
       String appId = request.getAppId();
-      auditContext.setAppId(appId);
+      auditContext.withAppId(appId);
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         AppHeartBeatResponse response =
             AppHeartBeatResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -719,7 +721,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       }
 
       if (Context.current().isCancelled()) {
-        auditContext.setStatusCode("CANCELLED");
+        auditContext.withStatusCode("CANCELLED");
         responseObserver.onError(
             Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
         LOG.warn("Cancelled by client {} for after deadline.", appId);
@@ -727,7 +729,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       }
 
       LOG.info("Get heartbeat from {}", appId);
-      auditContext.setStatusCode(StatusCode.SUCCESS);
+      auditContext.withStatusCode(StatusCode.SUCCESS);
       shuffleServer.getShuffleTaskManager().refreshAppId(appId);
       AppHeartBeatResponse response =
           AppHeartBeatResponse.newBuilder()
@@ -751,8 +753,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       Map<Integer, long[]> partitionToBlockIds =
           toPartitionBlocksMap(request.getPartitionToBlockIdsList());
 
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs(
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs(
           "taskAttemptId="
               + taskAttemptId
               + ", bitmapNum="
@@ -762,7 +764,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
 
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         ReportShuffleResultResponse response =
             ReportShuffleResultResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -809,7 +811,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         LOG.error("Error happened when report shuffle result for " + 
requestInfo, e);
       }
 
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
       reply =
           ReportShuffleResultResponse.newBuilder()
               .setStatus(status.toProto())
@@ -833,12 +835,12 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               request.getBlockIdLayout().getPartitionIdBits(),
               request.getBlockIdLayout().getTaskAttemptIdBits());
 
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs("partitionId=" + partitionId + ", blockIdLayout=" + 
blockIdLayout);
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs("partitionId=" + partitionId + ", blockIdLayout=" 
+ blockIdLayout);
 
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         GetShuffleResultResponse response =
             GetShuffleResultResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -874,7 +876,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         LOG.error("Error happened when get shuffle result for {}", 
requestInfo, e);
       }
 
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
       reply =
           GetShuffleResultResponse.newBuilder()
               .setStatus(status.toProto())
@@ -901,13 +903,13 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               request.getBlockIdLayout().getPartitionIdBits(),
               request.getBlockIdLayout().getTaskAttemptIdBits());
 
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs(
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs(
           "partitionsListSize=" + partitionsList.size() + ", blockIdLayout=" + 
blockIdLayout);
 
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         GetShuffleResultForMultiPartResponse response =
             GetShuffleResultForMultiPartResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -944,7 +946,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         LOG.error("Error happened when get shuffle result for {}", 
requestInfo, e);
       }
 
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
       reply =
           GetShuffleResultForMultiPartResponse.newBuilder()
               .setStatus(status.toProto())
@@ -969,8 +971,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       long offset = request.getOffset();
       int length = request.getLength();
 
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs(
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs(
           "partitionId="
               + partitionId
               + ", partitionNumPerRange="
@@ -984,7 +986,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
 
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         GetLocalShuffleDataResponse response =
             GetLocalShuffleDataResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -1095,7 +1097,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 .setRetMsg(msg)
                 .build();
       }
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
+      auditContext.withReturnValue("len=" + (sdr == null ? 0 : 
sdr.getDataLength()));
       responseObserver.onNext(reply);
       responseObserver.onCompleted();
     }
@@ -1111,8 +1114,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       int partitionId = request.getPartitionId();
       int partitionNumPerRange = request.getPartitionNumPerRange();
       int partitionNum = request.getPartitionNum();
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs(
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs(
           "partitionId="
               + partitionId
               + ", partitionNumPerRange="
@@ -1122,7 +1125,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
 
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         GetLocalShuffleIndexResponse reply =
             GetLocalShuffleIndexResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -1184,6 +1187,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
 
           builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
           builder.setDataFileLen(shuffleIndexResult.getDataFileLen());
+          auditContext.withReturnValue("len=" + 
shuffleIndexResult.getDataFileLen());
           reply = builder.build();
         } catch (FileNotFoundException indexFileNotFoundException) {
           LOG.warn(
@@ -1218,7 +1222,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 .setRetMsg(msg)
                 .build();
       }
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
       responseObserver.onNext(reply);
       responseObserver.onCompleted();
     }
@@ -1235,8 +1239,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       long blockId = request.getLastBlockId();
       int readBufferSize = request.getReadBufferSize();
 
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs(
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs(
           "partitionId="
               + partitionId
               + ", blockId="
@@ -1246,7 +1250,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
 
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         GetMemoryShuffleDataResponse reply =
             GetMemoryShuffleDataResponse.newBuilder()
                 .setStatus(status.toProto())
@@ -1308,6 +1312,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               costTime,
               data.length,
               requestInfo);
+          auditContext.withReturnValue(
+              "len=" + data.length + ", bufferSegmentSize=" + 
bufferSegments.size());
           reply =
               GetMemoryShuffleDataResponse.newBuilder()
                   .setStatus(status.toProto())
@@ -1351,7 +1357,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 .build();
       }
 
-      auditContext.setStatusCode(status);
+      auditContext.withStatusCode(status);
       responseObserver.onNext(reply);
       responseObserver.onCompleted();
     }
@@ -1457,7 +1463,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     }
     ServerRPCAuditContext auditContext = new 
ServerRPCAuditContext(auditLogger);
     if (auditLogger != null) {
-      auditContext.setCommand(command).setCreationTimeNs(System.nanoTime());
+      auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
     }
     return auditContext;
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
 
b/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
index 7001a1503..b78c606a5 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
@@ -32,6 +32,7 @@ public class ServerRPCAuditContext implements AuditContext {
   private String appId = "N/A";
   private int shuffleId = -1;
   private String args;
+  private String returnValue;
 
   /**
    * Constructor of {@link ServerRPCAuditContext}.
@@ -48,7 +49,7 @@ public class ServerRPCAuditContext implements AuditContext {
    * @param command the command associated with shuffle server rpc
    * @return this {@link AuditContext} instance
    */
-  public ServerRPCAuditContext setCommand(String command) {
+  public ServerRPCAuditContext withCommand(String command) {
     this.command = command;
     return this;
   }
@@ -60,7 +61,7 @@ public class ServerRPCAuditContext implements AuditContext {
    *     compute operation mExecutionTime
    * @return this {@link AuditContext} instance
    */
-  public ServerRPCAuditContext setCreationTimeNs(long creationTimeNs) {
+  public ServerRPCAuditContext withCreationTimeNs(long creationTimeNs) {
     this.creationTimeNs = creationTimeNs;
     return this;
   }
@@ -71,7 +72,7 @@ public class ServerRPCAuditContext implements AuditContext {
    * @param statusCode the status code
    * @return this {@link AuditContext} instance
    */
-  public ServerRPCAuditContext setStatusCode(StatusCode statusCode) {
+  public ServerRPCAuditContext withStatusCode(StatusCode statusCode) {
     this.statusCode = statusCode.name();
     return this;
   }
@@ -82,7 +83,7 @@ public class ServerRPCAuditContext implements AuditContext {
    * @param statusCode the status code
    * @return this {@link AuditContext} instance
    */
-  public ServerRPCAuditContext setStatusCode(String statusCode) {
+  public ServerRPCAuditContext withStatusCode(String statusCode) {
     this.statusCode = statusCode;
     return this;
   }
@@ -100,26 +101,34 @@ public class ServerRPCAuditContext implements 
AuditContext {
   public String toString() {
     String line =
         String.format(
-            
"cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d\t",
+            
"cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d",
             command, statusCode, appId, shuffleId, executionTimeNs / 1000);
     if (args != null) {
-      line += String.format("args{%s}", args);
+      line += String.format("\targs{%s}", args);
+    }
+    if (returnValue != null) {
+      line += String.format("\treturn{%s}", returnValue);
     }
     return line;
   }
 
-  public ServerRPCAuditContext setAppId(String appId) {
+  public ServerRPCAuditContext withAppId(String appId) {
     this.appId = appId;
     return this;
   }
 
-  public ServerRPCAuditContext setShuffleId(int shuffleId) {
+  public ServerRPCAuditContext withShuffleId(int shuffleId) {
     this.shuffleId = shuffleId;
     return this;
   }
 
-  public ServerRPCAuditContext setArgs(String args) {
+  public ServerRPCAuditContext withArgs(String args) {
     this.args = args;
     return this;
   }
+
+  public ServerRPCAuditContext withReturnValue(String returnValue) {
+    this.returnValue = returnValue;
+    return this;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 0daa68449..fdb723dfb 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -140,8 +140,8 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
 
       boolean isPreAllocated = info != null;
 
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs(
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs(
           "requireBufferId="
               + requireBufferId
               + ", requireSize="
@@ -179,7 +179,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             shuffleBufferManager,
             info,
             isPreAllocated);
-        auditContext.setStatusCode(rpcResponse.getStatusCode());
+        auditContext.withStatusCode(rpcResponse.getStatusCode());
         client.getChannel().writeAndFlush(rpcResponse);
         return;
       }
@@ -209,7 +209,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             shuffleBufferManager,
             info,
             isPreAllocated);
-        auditContext.setStatusCode(rpcResponse.getStatusCode());
+        auditContext.withStatusCode(rpcResponse.getStatusCode());
         client.getChannel().writeAndFlush(rpcResponse);
         return;
       }
@@ -252,7 +252,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                   + " in ShuffleServer's configuration";
           LOG.warn(errorMsg);
           rpcResponse = new RpcResponse(req.getRequestId(), 
StatusCode.INTERNAL_ERROR, errorMsg);
-          auditContext.setStatusCode(rpcResponse.getStatusCode());
+          auditContext.withStatusCode(rpcResponse.getStatusCode());
           client.getChannel().writeAndFlush(rpcResponse);
           return;
         }
@@ -352,7 +352,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
         rpcResponse =
             new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, "No 
data in request");
       }
-      auditContext.setStatusCode(rpcResponse.getStatusCode());
+      auditContext.withStatusCode(rpcResponse.getStatusCode());
       client.getChannel().writeAndFlush(rpcResponse);
     }
   }
@@ -392,9 +392,11 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       int partitionId = req.getPartitionId();
       long blockId = req.getLastBlockId();
       int readBufferSize = req.getReadBufferSize();
-      auditContext.setAppId(appId).setShuffleId(shuffleId);
-      auditContext.setArgs(
-          "partitionId="
+      auditContext.withAppId(appId).withShuffleId(shuffleId);
+      auditContext.withArgs(
+          "requestId="
+              + req.getRequestId()
+              + ", partitionId="
               + partitionId
               + ", blockId="
               + blockId
@@ -402,7 +404,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               + readBufferSize);
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         GetMemoryShuffleDataResponse response =
             new GetMemoryShuffleDataResponse(
                 req.getRequestId(),
@@ -453,7 +455,9 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
             
ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
           }
-          auditContext.setStatusCode(status);
+          auditContext.withStatusCode(status);
+          auditContext.withReturnValue(
+              "len=" + data.size() + ", bufferSegments=" + 
bufferSegments.size());
           response =
               new GetMemoryShuffleDataResponse(
                   req.getRequestId(), status, msg, bufferSegments, data);
@@ -486,7 +490,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             new GetMemoryShuffleDataResponse(
                 req.getRequestId(), status, msg, Lists.newArrayList(), 
Unpooled.EMPTY_BUFFER);
       }
-      auditContext.setStatusCode(response.getStatusCode());
+      auditContext.withStatusCode(response.getStatusCode());
       client.getChannel().writeAndFlush(response);
     }
   }
@@ -501,10 +505,12 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       int partitionNum = req.getPartitionNum();
       String requestInfo =
           "appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + 
partitionId + "]";
-      auditContext.setAppId(appId);
-      auditContext.setShuffleId(shuffleId);
-      auditContext.setArgs(
-          "partitionId="
+      auditContext.withAppId(appId);
+      auditContext.withShuffleId(shuffleId);
+      auditContext.withArgs(
+          "requestId="
+              + req.getRequestId()
+              + ", partitionId="
               + partitionId
               + ", partitionNumPerRange="
               + partitionNumPerRange
@@ -512,7 +518,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               + partitionNum);
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         GetLocalShuffleIndexResponse response =
             new GetLocalShuffleIndexResponse(
                 req.getRequestId(), status, status.toString(), 
Unpooled.EMPTY_BUFFER, 0L);
@@ -553,7 +559,8 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
           
ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.size());
           ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.inc();
           
ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.inc(assumedFileSize);
-          auditContext.setStatusCode(status);
+          auditContext.withStatusCode(status);
+          auditContext.withReturnValue("len=" + data.size());
           response =
               new GetLocalShuffleIndexResponse(
                   req.getRequestId(), status, msg, data, 
shuffleIndexResult.getDataFileLen());
@@ -594,7 +601,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             new GetLocalShuffleIndexResponse(
                 req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
       }
-      auditContext.setStatusCode(response.getStatusCode());
+      auditContext.withStatusCode(response.getStatusCode());
       client.getChannel().writeAndFlush(response);
     }
   }
@@ -609,10 +616,12 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
       int partitionNum = req.getPartitionNum();
       long offset = req.getOffset();
       int length = req.getLength();
-      auditContext.setAppId(appId);
-      auditContext.setShuffleId(shuffleId);
-      auditContext.setArgs(
-          "partitionId="
+      auditContext.withAppId(appId);
+      auditContext.withShuffleId(shuffleId);
+      auditContext.withArgs(
+          "requestId="
+              + req.getRequestId()
+              + ", partitionId="
               + partitionId
               + ", partitionNumPerRange="
               + partitionNumPerRange
@@ -624,7 +633,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               + length);
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
-        auditContext.setStatusCode(status);
+        auditContext.withStatusCode(status);
         response =
             new GetLocalShuffleDataResponse(
                 req.getRequestId(),
@@ -696,7 +705,8 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
               new ReleaseMemoryAndRecordReadTimeListener(
                   start, length, sdr.getDataLength(), requestInfo, req, 
response, client);
           client.getChannel().writeAndFlush(response).addListener(listener);
-          auditContext.setStatusCode(response.getStatusCode());
+          auditContext.withStatusCode(response.getStatusCode());
+          auditContext.withReturnValue("len=" + sdr.getDataLength());
           return;
         } catch (Exception e) {
           shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
@@ -718,7 +728,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
             new GetLocalShuffleDataResponse(
                 req.getRequestId(), status, msg, new 
NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
       }
-      auditContext.setStatusCode(response.getStatusCode());
+      auditContext.withStatusCode(response.getStatusCode());
       client.getChannel().writeAndFlush(response);
     }
   }
@@ -882,7 +892,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     }
     ServerRPCAuditContext auditContext = new 
ServerRPCAuditContext(auditLogger);
     if (auditLogger != null) {
-      auditContext.setCommand(command).setCreationTimeNs(System.nanoTime());
+      auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
     }
     return auditContext;
   }

Reply via email to