This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fe1f30469 [MINOR] improve(server): Add return value to server rpc
audit log (#2007)
fe1f30469 is described below
commit fe1f30469de74f6e7e36c7a39e45c47f1396d79e
Author: maobaolong <[email protected]>
AuthorDate: Mon Aug 12 10:21:46 2024 +0800
[MINOR] improve(server): Add return value to server rpc audit log (#2007)
### What changes were proposed in this pull request?
Add return value for server rpc audit log
### Why are the changes needed?
With the return value, we can know clear about the effect of the rpc call.
E.g. A bundle of `requireBuffer` calls appear in the rpc audit log, then
followed some `sendShuffleData` rpc call with `requireBufferId`, but we don't
know which `requireBuffer` call match the sendShuffleData call.
With this PR, we record the requireBufferId also which generated by
`requireBuffer` call, and search the sendShuffleData call with the
`requireBufferId`, this can be possible.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Test Locally.
---
.../coordinator/CoordinatorGrpcService.java | 46 ++++----
.../audit/CoordinatorRPCAuditContext.java | 16 +--
.../uniffle/server/ShuffleServerGrpcService.java | 118 +++++++++++----------
.../server/audit/ServerRPCAuditContext.java | 27 +++--
.../server/netty/ShuffleServerNettyHandler.java | 64 ++++++-----
5 files changed, 148 insertions(+), 123 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index c26f8627d..d618211e0 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -135,8 +135,8 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
final Set<String> faultyServerIds = new
HashSet<>(request.getFaultyServerIdsList());
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs(
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs(
String.format(
"partitionNum=%d, partitionNumPerRange=%d, replica=%d,
requiredTags=%s, "
+ "requiredShuffleServerNumber=%d, faultyServerIds=%s,
stageId=%d, stageAttemptNumber=%d, isReassign=%b",
@@ -205,7 +205,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
responseObserver.onNext(response);
} finally {
if (response != null) {
- auditContext.setStatusCode(response.getStatus());
+ auditContext.withStatusCode(response.getStatus());
}
responseObserver.onCompleted();
}
@@ -218,7 +218,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
StreamObserver<ShuffleServerHeartBeatResponse> responseObserver) {
try (CoordinatorRPCAuditContext auditContext =
createAuditContext("heartbeat")) {
final ServerNode serverNode = toServerNode(request);
- auditContext.setArgs("serverNode=" + serverNode.getId());
+ auditContext.withArgs("serverNode=" + serverNode.getId());
coordinatorServer.getClusterManager().add(serverNode);
final ShuffleServerHeartBeatResponse response =
ShuffleServerHeartBeatResponse.newBuilder()
@@ -228,7 +228,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
if (LOG.isDebugEnabled()) {
LOG.debug("Got heartbeat from {}", serverNode);
}
- auditContext.setStatusCode(response.getStatus());
+ auditContext.withStatusCode(response.getStatus());
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@@ -242,7 +242,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
CheckServiceAvailableResponse.newBuilder()
.setAvailable(coordinatorServer.getClusterManager().getNodesNum() > 0)
.build();
- auditContext.setStatusCode(StatusCode.SUCCESS);
+ auditContext.withStatusCode(StatusCode.SUCCESS);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@@ -257,7 +257,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
final int clientPort = request.getClientPort();
final ShuffleServerId shuffleServer = request.getServer();
final String operation = request.getOperation();
- auditContext.setArgs(
+ auditContext.withArgs(
String.format("%s:%s->%s->%s", clientHost, clientPort, operation,
shuffleServer));
LOG.info(clientHost + ":" + clientPort + "->" + operation + "->" +
shuffleServer);
@@ -266,7 +266,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
.setRetMsg("")
.setStatus(StatusCode.SUCCESS)
.build();
- auditContext.setStatusCode(response.getStatus());
+ auditContext.withStatusCode(response.getStatus());
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@@ -277,7 +277,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse>
responseObserver) {
try (CoordinatorRPCAuditContext auditContext =
createAuditContext("appHeartbeat")) {
String appId = request.getAppId();
- auditContext.setAppId(appId);
+ auditContext.withAppId(appId);
coordinatorServer.getApplicationManager().refreshAppId(appId);
if (LOG.isDebugEnabled()) {
LOG.debug("Got heartbeat from application: {}", appId);
@@ -288,12 +288,12 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
if (Context.current().isCancelled()) {
responseObserver.onError(
Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
- auditContext.setStatusCode("CANCELLED");
+ auditContext.withStatusCode("CANCELLED");
LOG.warn("Cancelled by client {} for after deadline.", appId);
return;
}
- auditContext.setStatusCode(response.getStatus());
+ auditContext.withStatusCode(response.getStatus());
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@@ -305,7 +305,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
try (CoordinatorRPCAuditContext auditContext =
createAuditContext("registerApplicationInfo")) {
String appId = request.getAppId();
String user = request.getUser();
- auditContext.setAppId(appId).setArgs("user=" + user);
+ auditContext.withAppId(appId).withArgs("user=" + user);
coordinatorServer
.getApplicationManager()
.registerApplicationInfo(appId, user, request.getVersion(),
request.getGitCommitId());
@@ -318,12 +318,12 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
if (Context.current().isCancelled()) {
responseObserver.onError(
Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
- auditContext.setStatusCode("CANCELLED");
+ auditContext.withStatusCode("CANCELLED");
LOG.warn("Cancelled by client {} for after deadline.", appId);
return;
}
- auditContext.setStatusCode(response.getStatus());
+ auditContext.withStatusCode(response.getStatus());
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@@ -344,7 +344,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
request.getExtraPropertiesMap(),
request.getUser());
- auditContext.setArgs("accessInfo=" + accessInfo);
+ auditContext.withArgs("accessInfo=" + accessInfo);
AccessCheckResult result = accessManager.handleAccessRequest(accessInfo);
if (!result.isSuccess()) {
@@ -361,12 +361,12 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
if (Context.current().isCancelled()) {
responseObserver.onError(
Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
- auditContext.setStatusCode("CANCELLED");
+ auditContext.withStatusCode("CANCELLED");
LOG.warn("Cancelled by client {} for after deadline.", accessInfo);
return;
}
- auditContext.setStatusCode(response.getStatus());
+ auditContext.withStatusCode(response.getStatus());
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@@ -378,7 +378,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
Empty empty, StreamObserver<FetchClientConfResponse> responseObserver) {
try (CoordinatorRPCAuditContext auditContext =
createAuditContext("fetchClientConf")) {
fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO,
responseObserver);
- auditContext.setStatusCode(StatusCode.SUCCESS);
+ auditContext.withStatusCode(StatusCode.SUCCESS);
}
}
@@ -387,7 +387,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
FetchClientConfRequest request, StreamObserver<FetchClientConfResponse>
responseObserver) {
try (CoordinatorRPCAuditContext auditContext =
createAuditContext("fetchClientConfV2")) {
fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request),
responseObserver);
- auditContext.setStatusCode(StatusCode.SUCCESS);
+ auditContext.withStatusCode(StatusCode.SUCCESS);
}
}
@@ -430,7 +430,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
FetchRemoteStorageResponse response;
StatusCode status = StatusCode.SUCCESS;
String appId = request.getAppId();
- auditContext.setAppId(appId);
+ auditContext.withAppId(appId);
try {
RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder();
RemoteStorageInfo rsInfo =
@@ -461,12 +461,12 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
if (Context.current().isCancelled()) {
responseObserver.onError(
Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
- auditContext.setStatusCode("CANCELLED");
+ auditContext.withStatusCode("CANCELLED");
LOG.warn("Fetch client conf cancelled by client for after deadline.");
return;
}
- auditContext.setStatusCode(response.getStatus());
+ auditContext.withStatusCode(response.getStatus());
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@@ -536,7 +536,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
}
CoordinatorRPCAuditContext auditContext = new
CoordinatorRPCAuditContext(auditLogger);
if (auditLogger != null) {
- auditContext.setCommand(command).setCreationTimeNs(System.nanoTime());
+ auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
}
return auditContext;
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
index 7a6453bfb..a66bd2805 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
@@ -48,7 +48,7 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
* @param command the command associated with shuffle server rpc
* @return this {@link AuditContext} instance
*/
- public CoordinatorRPCAuditContext setCommand(String command) {
+ public CoordinatorRPCAuditContext withCommand(String command) {
this.command = command;
return this;
}
@@ -60,7 +60,7 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
* compute operation mExecutionTime
* @return this {@link AuditContext} instance
*/
- public CoordinatorRPCAuditContext setCreationTimeNs(long creationTimeNs) {
+ public CoordinatorRPCAuditContext withCreationTimeNs(long creationTimeNs) {
this.creationTimeNs = creationTimeNs;
return this;
}
@@ -71,7 +71,7 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
* @param statusCode the status code
* @return this {@link AuditContext} instance
*/
- public CoordinatorRPCAuditContext setStatusCode(StatusCode statusCode) {
+ public CoordinatorRPCAuditContext withStatusCode(StatusCode statusCode) {
if (statusCode == null) {
this.statusCode = "UNKNOWN";
} else {
@@ -86,7 +86,7 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
* @param statusCode the status code
* @return this {@link AuditContext} instance
*/
- public CoordinatorRPCAuditContext setStatusCode(
+ public CoordinatorRPCAuditContext withStatusCode(
org.apache.uniffle.proto.RssProtos.StatusCode statusCode) {
if (statusCode == null) {
this.statusCode = "UNKNOWN";
@@ -102,7 +102,7 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
* @param statusCode the status code
* @return this {@link AuditContext} instance
*/
- public CoordinatorRPCAuditContext setStatusCode(String statusCode) {
+ public CoordinatorRPCAuditContext withStatusCode(String statusCode) {
this.statusCode = statusCode;
return this;
}
@@ -128,17 +128,17 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
return line;
}
- public CoordinatorRPCAuditContext setAppId(String appId) {
+ public CoordinatorRPCAuditContext withAppId(String appId) {
this.appId = appId;
return this;
}
- public CoordinatorRPCAuditContext setShuffleId(int shuffleId) {
+ public CoordinatorRPCAuditContext withShuffleId(int shuffleId) {
this.shuffleId = shuffleId;
return this;
}
- public CoordinatorRPCAuditContext setArgs(String args) {
+ public CoordinatorRPCAuditContext withArgs(String args) {
this.args = args;
return this;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 249e7c376..b0fe9d9ef 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -127,10 +127,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
StreamObserver<RssProtos.ShuffleUnregisterByAppIdResponse>
responseStreamObserver) {
try (ServerRPCAuditContext auditContext =
createAuditContext("unregisterShuffleByAppId")) {
String appId = request.getAppId();
- auditContext.setAppId(appId);
+ auditContext.withAppId(appId);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
RssProtos.ShuffleUnregisterByAppIdResponse reply =
RssProtos.ShuffleUnregisterByAppIdResponse.newBuilder()
.setStatus(status.toProto())
@@ -148,7 +148,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
status = StatusCode.INTERNAL_ERROR;
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
RssProtos.ShuffleUnregisterByAppIdResponse reply =
RssProtos.ShuffleUnregisterByAppIdResponse.newBuilder()
.setStatus(status.toProto())
@@ -166,10 +166,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
try (ServerRPCAuditContext auditContext =
createAuditContext("unregisterShuffle")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
- auditContext.setAppId(appId).setShuffleId(shuffleId);
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
RssProtos.ShuffleUnregisterResponse reply =
RssProtos.ShuffleUnregisterResponse.newBuilder()
.setStatus(status.toProto())
@@ -186,7 +186,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
status = StatusCode.INTERNAL_ERROR;
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
RssProtos.ShuffleUnregisterResponse reply =
RssProtos.ShuffleUnregisterResponse.newBuilder()
.setStatus(status.toProto())
@@ -207,8 +207,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
String remoteStoragePath = req.getRemoteStorage().getPath();
String user = req.getUser();
int stageAttemptNumber = req.getStageAttemptNumber();
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs(
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs(
"remoteStoragePath="
+ remoteStoragePath
+ ", user="
@@ -240,7 +240,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
shuffleId,
e);
StatusCode code = StatusCode.INTERNAL_ERROR;
- auditContext.setStatusCode(code);
+ auditContext.withStatusCode(code);
reply =
ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
@@ -250,7 +250,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
// When a Stage retry occurs, the first or last registration of a
Stage may need to be
// ignored and the ignored status quickly returned.
StatusCode code = StatusCode.STAGE_RETRY_IGNORE;
- auditContext.setStatusCode(code);
+ auditContext.withStatusCode(code);
reply =
ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
@@ -296,7 +296,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
user,
shuffleDataDistributionType,
maxConcurrencyPerPartitionToWrite);
- auditContext.setStatusCode(result);
+ auditContext.withStatusCode(result);
reply =
ShuffleRegisterResponse.newBuilder().setStatus(result.toProto()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
@@ -314,8 +314,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
long timestamp = req.getTimestamp();
int stageAttemptNumber = req.getStageAttemptNumber();
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs(
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs(
"requireBufferId="
+ requireBufferId
+ ", timestamp="
@@ -342,7 +342,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.setStatus(StatusCode.APP_NOT_FOUND.toProto())
.setRetMsg(errorMsg)
.build();
- auditContext.setStatusCode(StatusCode.fromProto(reply.getStatus()));
+ auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
@@ -357,7 +357,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.setStatus(StatusCode.STAGE_RETRY_IGNORE.toProto())
.setRetMsg(responseMessage)
.build();
- auditContext.setStatusCode(StatusCode.fromProto(reply.getStatus()));
+ auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
@@ -403,7 +403,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.setStatus(StatusCode.INTERNAL_ERROR.toProto())
.setRetMsg(responseMessage)
.build();
- auditContext.setStatusCode(StatusCode.fromProto(reply.getStatus()));
+ auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
@@ -510,7 +510,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.build();
}
- auditContext.setStatusCode(StatusCode.fromProto(reply.getStatus()));
+ auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@@ -522,10 +522,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
try (ServerRPCAuditContext auditContext =
createAuditContext("commitShuffleTask")) {
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
- auditContext.setAppId(appId).setShuffleId(shuffleId);
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
ShuffleCommitResponse response =
ShuffleCommitResponse.newBuilder()
.setStatus(status.toProto())
@@ -544,6 +544,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
commitCount =
shuffleServer.getShuffleTaskManager().updateAndGetCommitCount(appId, shuffleId);
+ auditContext.withReturnValue("commitCount=" + commitCount);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Get commitShuffleTask request for appId["
@@ -560,7 +561,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
LOG.error(msg, e);
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
ShuffleCommitResponse reply =
ShuffleCommitResponse.newBuilder()
.setCommitCount(commitCount)
@@ -578,10 +579,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
try (ServerRPCAuditContext auditContext =
createAuditContext("finishShuffle")) {
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
- auditContext.setAppId(appId).setShuffleId(shuffleId);
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
FinishShuffleResponse response =
FinishShuffleResponse.newBuilder()
.setStatus(status.toProto())
@@ -612,7 +613,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
LOG.error(errorMsg, e);
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
FinishShuffleResponse response =
FinishShuffleResponse.newBuilder().setStatus(status.toProto()).setRetMsg(msg).build();
responseObserver.onNext(response);
@@ -625,15 +626,15 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
RequireBufferRequest request, StreamObserver<RequireBufferResponse>
responseObserver) {
try (ServerRPCAuditContext auditContext =
createAuditContext("requireBuffer")) {
String appId = request.getAppId();
- auditContext.setAppId(appId).setShuffleId(request.getShuffleId());
+ auditContext.withAppId(appId).withShuffleId(request.getShuffleId());
String auditArgs = "requireSize=" + request.getRequireSize();
if (request.getPartitionIdsList() != null) {
auditArgs += ", partitionIdsSize=" +
request.getPartitionIdsList().size();
}
- auditContext.setArgs(auditArgs);
+ auditContext.withArgs(auditArgs);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
RequireBufferResponse response =
RequireBufferResponse.newBuilder()
.setStatus(status.toProto())
@@ -687,7 +688,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
+ e.getMessage();
LOG.error(responseMessage);
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
+ auditContext.withReturnValue("requireBufferId=" + requireBufferId);
RequireBufferResponse response =
RequireBufferResponse.newBuilder()
.setStatus(status.toProto())
@@ -704,10 +706,10 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse>
responseObserver) {
try (ServerRPCAuditContext auditContext =
createAuditContext("appHeartbeat")) {
String appId = request.getAppId();
- auditContext.setAppId(appId);
+ auditContext.withAppId(appId);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
AppHeartBeatResponse response =
AppHeartBeatResponse.newBuilder()
.setStatus(status.toProto())
@@ -719,7 +721,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
if (Context.current().isCancelled()) {
- auditContext.setStatusCode("CANCELLED");
+ auditContext.withStatusCode("CANCELLED");
responseObserver.onError(
Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
LOG.warn("Cancelled by client {} for after deadline.", appId);
@@ -727,7 +729,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
LOG.info("Get heartbeat from {}", appId);
- auditContext.setStatusCode(StatusCode.SUCCESS);
+ auditContext.withStatusCode(StatusCode.SUCCESS);
shuffleServer.getShuffleTaskManager().refreshAppId(appId);
AppHeartBeatResponse response =
AppHeartBeatResponse.newBuilder()
@@ -751,8 +753,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
Map<Integer, long[]> partitionToBlockIds =
toPartitionBlocksMap(request.getPartitionToBlockIdsList());
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs(
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs(
"taskAttemptId="
+ taskAttemptId
+ ", bitmapNum="
@@ -762,7 +764,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
ReportShuffleResultResponse response =
ReportShuffleResultResponse.newBuilder()
.setStatus(status.toProto())
@@ -809,7 +811,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
LOG.error("Error happened when report shuffle result for " +
requestInfo, e);
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
reply =
ReportShuffleResultResponse.newBuilder()
.setStatus(status.toProto())
@@ -833,12 +835,12 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
request.getBlockIdLayout().getPartitionIdBits(),
request.getBlockIdLayout().getTaskAttemptIdBits());
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs("partitionId=" + partitionId + ", blockIdLayout=" +
blockIdLayout);
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs("partitionId=" + partitionId + ", blockIdLayout="
+ blockIdLayout);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
GetShuffleResultResponse response =
GetShuffleResultResponse.newBuilder()
.setStatus(status.toProto())
@@ -874,7 +876,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
LOG.error("Error happened when get shuffle result for {}",
requestInfo, e);
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
reply =
GetShuffleResultResponse.newBuilder()
.setStatus(status.toProto())
@@ -901,13 +903,13 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
request.getBlockIdLayout().getPartitionIdBits(),
request.getBlockIdLayout().getTaskAttemptIdBits());
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs(
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs(
"partitionsListSize=" + partitionsList.size() + ", blockIdLayout=" +
blockIdLayout);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
GetShuffleResultForMultiPartResponse response =
GetShuffleResultForMultiPartResponse.newBuilder()
.setStatus(status.toProto())
@@ -944,7 +946,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
LOG.error("Error happened when get shuffle result for {}",
requestInfo, e);
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
reply =
GetShuffleResultForMultiPartResponse.newBuilder()
.setStatus(status.toProto())
@@ -969,8 +971,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
long offset = request.getOffset();
int length = request.getLength();
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs(
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs(
"partitionId="
+ partitionId
+ ", partitionNumPerRange="
@@ -984,7 +986,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
GetLocalShuffleDataResponse response =
GetLocalShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
@@ -1095,7 +1097,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.setRetMsg(msg)
.build();
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
+ auditContext.withReturnValue("len=" + (sdr == null ? 0 :
sdr.getDataLength()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@@ -1111,8 +1114,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
int partitionId = request.getPartitionId();
int partitionNumPerRange = request.getPartitionNumPerRange();
int partitionNum = request.getPartitionNum();
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs(
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs(
"partitionId="
+ partitionId
+ ", partitionNumPerRange="
@@ -1122,7 +1125,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
GetLocalShuffleIndexResponse reply =
GetLocalShuffleIndexResponse.newBuilder()
.setStatus(status.toProto())
@@ -1184,6 +1187,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
builder.setDataFileLen(shuffleIndexResult.getDataFileLen());
+ auditContext.withReturnValue("len=" +
shuffleIndexResult.getDataFileLen());
reply = builder.build();
} catch (FileNotFoundException indexFileNotFoundException) {
LOG.warn(
@@ -1218,7 +1222,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.setRetMsg(msg)
.build();
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@@ -1235,8 +1239,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
long blockId = request.getLastBlockId();
int readBufferSize = request.getReadBufferSize();
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs(
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs(
"partitionId="
+ partitionId
+ ", blockId="
@@ -1246,7 +1250,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
GetMemoryShuffleDataResponse reply =
GetMemoryShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
@@ -1308,6 +1312,8 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
costTime,
data.length,
requestInfo);
+ auditContext.withReturnValue(
+ "len=" + data.length + ", bufferSegmentSize=" +
bufferSegments.size());
reply =
GetMemoryShuffleDataResponse.newBuilder()
.setStatus(status.toProto())
@@ -1351,7 +1357,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.build();
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@@ -1457,7 +1463,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
ServerRPCAuditContext auditContext = new
ServerRPCAuditContext(auditLogger);
if (auditLogger != null) {
- auditContext.setCommand(command).setCreationTimeNs(System.nanoTime());
+ auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
}
return auditContext;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
b/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
index 7001a1503..b78c606a5 100644
---
a/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
+++
b/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
@@ -32,6 +32,7 @@ public class ServerRPCAuditContext implements AuditContext {
private String appId = "N/A";
private int shuffleId = -1;
private String args;
+ private String returnValue;
/**
* Constructor of {@link ServerRPCAuditContext}.
@@ -48,7 +49,7 @@ public class ServerRPCAuditContext implements AuditContext {
* @param command the command associated with shuffle server rpc
* @return this {@link AuditContext} instance
*/
- public ServerRPCAuditContext setCommand(String command) {
+ public ServerRPCAuditContext withCommand(String command) {
this.command = command;
return this;
}
@@ -60,7 +61,7 @@ public class ServerRPCAuditContext implements AuditContext {
* compute operation mExecutionTime
* @return this {@link AuditContext} instance
*/
- public ServerRPCAuditContext setCreationTimeNs(long creationTimeNs) {
+ public ServerRPCAuditContext withCreationTimeNs(long creationTimeNs) {
this.creationTimeNs = creationTimeNs;
return this;
}
@@ -71,7 +72,7 @@ public class ServerRPCAuditContext implements AuditContext {
* @param statusCode the status code
* @return this {@link AuditContext} instance
*/
- public ServerRPCAuditContext setStatusCode(StatusCode statusCode) {
+ public ServerRPCAuditContext withStatusCode(StatusCode statusCode) {
this.statusCode = statusCode.name();
return this;
}
@@ -82,7 +83,7 @@ public class ServerRPCAuditContext implements AuditContext {
* @param statusCode the status code
* @return this {@link AuditContext} instance
*/
- public ServerRPCAuditContext setStatusCode(String statusCode) {
+ public ServerRPCAuditContext withStatusCode(String statusCode) {
this.statusCode = statusCode;
return this;
}
@@ -100,26 +101,34 @@ public class ServerRPCAuditContext implements
AuditContext {
public String toString() {
String line =
String.format(
-
"cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d\t",
+
"cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d",
command, statusCode, appId, shuffleId, executionTimeNs / 1000);
if (args != null) {
- line += String.format("args{%s}", args);
+ line += String.format("\targs{%s}", args);
+ }
+ if (returnValue != null) {
+ line += String.format("\treturn{%s}", returnValue);
}
return line;
}
- public ServerRPCAuditContext setAppId(String appId) {
+ public ServerRPCAuditContext withAppId(String appId) {
this.appId = appId;
return this;
}
- public ServerRPCAuditContext setShuffleId(int shuffleId) {
+ public ServerRPCAuditContext withShuffleId(int shuffleId) {
this.shuffleId = shuffleId;
return this;
}
- public ServerRPCAuditContext setArgs(String args) {
+ public ServerRPCAuditContext withArgs(String args) {
this.args = args;
return this;
}
+
+ public ServerRPCAuditContext withReturnValue(String returnValue) {
+ this.returnValue = returnValue;
+ return this;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 0daa68449..fdb723dfb 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -140,8 +140,8 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
boolean isPreAllocated = info != null;
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs(
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs(
"requireBufferId="
+ requireBufferId
+ ", requireSize="
@@ -179,7 +179,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
shuffleBufferManager,
info,
isPreAllocated);
- auditContext.setStatusCode(rpcResponse.getStatusCode());
+ auditContext.withStatusCode(rpcResponse.getStatusCode());
client.getChannel().writeAndFlush(rpcResponse);
return;
}
@@ -209,7 +209,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
shuffleBufferManager,
info,
isPreAllocated);
- auditContext.setStatusCode(rpcResponse.getStatusCode());
+ auditContext.withStatusCode(rpcResponse.getStatusCode());
client.getChannel().writeAndFlush(rpcResponse);
return;
}
@@ -252,7 +252,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ " in ShuffleServer's configuration";
LOG.warn(errorMsg);
rpcResponse = new RpcResponse(req.getRequestId(),
StatusCode.INTERNAL_ERROR, errorMsg);
- auditContext.setStatusCode(rpcResponse.getStatusCode());
+ auditContext.withStatusCode(rpcResponse.getStatusCode());
client.getChannel().writeAndFlush(rpcResponse);
return;
}
@@ -352,7 +352,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
rpcResponse =
new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, "No
data in request");
}
- auditContext.setStatusCode(rpcResponse.getStatusCode());
+ auditContext.withStatusCode(rpcResponse.getStatusCode());
client.getChannel().writeAndFlush(rpcResponse);
}
}
@@ -392,9 +392,11 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
int partitionId = req.getPartitionId();
long blockId = req.getLastBlockId();
int readBufferSize = req.getReadBufferSize();
- auditContext.setAppId(appId).setShuffleId(shuffleId);
- auditContext.setArgs(
- "partitionId="
+ auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withArgs(
+ "requestId="
+ + req.getRequestId()
+ + ", partitionId="
+ partitionId
+ ", blockId="
+ blockId
@@ -402,7 +404,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ readBufferSize);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
GetMemoryShuffleDataResponse response =
new GetMemoryShuffleDataResponse(
req.getRequestId(),
@@ -453,7 +455,9 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
}
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
+ auditContext.withReturnValue(
+ "len=" + data.size() + ", bufferSegments=" +
bufferSegments.size());
response =
new GetMemoryShuffleDataResponse(
req.getRequestId(), status, msg, bufferSegments, data);
@@ -486,7 +490,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
new GetMemoryShuffleDataResponse(
req.getRequestId(), status, msg, Lists.newArrayList(),
Unpooled.EMPTY_BUFFER);
}
- auditContext.setStatusCode(response.getStatusCode());
+ auditContext.withStatusCode(response.getStatusCode());
client.getChannel().writeAndFlush(response);
}
}
@@ -501,10 +505,12 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
int partitionNum = req.getPartitionNum();
String requestInfo =
"appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" +
partitionId + "]";
- auditContext.setAppId(appId);
- auditContext.setShuffleId(shuffleId);
- auditContext.setArgs(
- "partitionId="
+ auditContext.withAppId(appId);
+ auditContext.withShuffleId(shuffleId);
+ auditContext.withArgs(
+ "requestId="
+ + req.getRequestId()
+ + ", partitionId="
+ partitionId
+ ", partitionNumPerRange="
+ partitionNumPerRange
@@ -512,7 +518,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ partitionNum);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
GetLocalShuffleIndexResponse response =
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, status.toString(),
Unpooled.EMPTY_BUFFER, 0L);
@@ -553,7 +559,8 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.size());
ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.inc();
ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.inc(assumedFileSize);
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
+ auditContext.withReturnValue("len=" + data.size());
response =
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, data,
shuffleIndexResult.getDataFileLen());
@@ -594,7 +601,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
}
- auditContext.setStatusCode(response.getStatusCode());
+ auditContext.withStatusCode(response.getStatusCode());
client.getChannel().writeAndFlush(response);
}
}
@@ -609,10 +616,12 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
int partitionNum = req.getPartitionNum();
long offset = req.getOffset();
int length = req.getLength();
- auditContext.setAppId(appId);
- auditContext.setShuffleId(shuffleId);
- auditContext.setArgs(
- "partitionId="
+ auditContext.withAppId(appId);
+ auditContext.withShuffleId(shuffleId);
+ auditContext.withArgs(
+ "requestId="
+ + req.getRequestId()
+ + ", partitionId="
+ partitionId
+ ", partitionNumPerRange="
+ partitionNumPerRange
@@ -624,7 +633,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ length);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
- auditContext.setStatusCode(status);
+ auditContext.withStatusCode(status);
response =
new GetLocalShuffleDataResponse(
req.getRequestId(),
@@ -696,7 +705,8 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
new ReleaseMemoryAndRecordReadTimeListener(
start, length, sdr.getDataLength(), requestInfo, req,
response, client);
client.getChannel().writeAndFlush(response).addListener(listener);
- auditContext.setStatusCode(response.getStatusCode());
+ auditContext.withStatusCode(response.getStatusCode());
+ auditContext.withReturnValue("len=" + sdr.getDataLength());
return;
} catch (Exception e) {
shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
@@ -718,7 +728,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
new GetLocalShuffleDataResponse(
req.getRequestId(), status, msg, new
NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
}
- auditContext.setStatusCode(response.getStatusCode());
+ auditContext.withStatusCode(response.getStatusCode());
client.getChannel().writeAndFlush(response);
}
}
@@ -882,7 +892,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
}
ServerRPCAuditContext auditContext = new
ServerRPCAuditContext(auditLogger);
if (auditLogger != null) {
- auditContext.setCommand(command).setCreationTimeNs(System.nanoTime());
+ auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
}
return auditContext;
}