This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6819c584f [MINOR] feat(coordinator/server): Support RPC from-address
to audit log and refactor RpcAuditContext (#2013)
6819c584f is described below
commit 6819c584fa06fa566353726a6f7bb06cc9571097
Author: maobaolong <[email protected]>
AuthorDate: Wed Aug 28 10:59:46 2024 +0800
[MINOR] feat(coordinator/server): Support RPC from-address to audit log and
refactor RpcAuditContext (#2013)
### What changes were proposed in this pull request?
- Support RPC from-address to audit log
- Do refactor to RpcAuditContext
### Why are the changes needed?
Add from information for each call to make it easy for trouble-shooting.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Start coordinator\servers and check the rpc auditlog
```
# coordinator_rpc_audit.log
[2024-08-05 19:12:19.532] cmd=registerApplicationInfo statusCode=SUCCESS
from=/x.x.x.x:49416 executionTimeUs=1514
appId=app-20240805190949-0009_1722856187872 args{user=user}
# server_rpc_audit.log
[2024-08-05 19:12:21.511] cmd=requireBuffer statusCode=SUCCESS
from=/x.x.x.x:43904 executionTimeUs=1294
appId=app-20240805190949-0009_1722856187872 shuffleId=0
args{requireSize=233, partitionIdsSize=1}
[2024-08-05 19:12:21.778] cmd=sendShuffleData statusCode=SUCCESS
from=/x.x.x.x:39340 executionTimeUs=25464
appId=app-20240805190949-0009_1722856187872 shuffleId=0
args{requireBufferId=4, requireSize=233, isPreAllocated=true,
requireBlocksSize=39, stageAttemptNumber=0}
```
---
.../uniffle/common/audit/RpcAuditContext.java | 86 ++++++-------
.../common/rpc/ClientContextServerInterceptor.java | 72 +++++++++++
.../org/apache/uniffle/common/rpc/GrpcServer.java | 1 +
.../coordinator/CoordinatorGrpcService.java | 45 ++++---
.../audit/CoordinatorRpcAuditContext.java | 30 ++++-
.../uniffle/server/ShuffleServerGrpcService.java | 48 ++++----
.../server/audit/ServerRPCAuditContext.java | 134 ---------------------
.../server/audit/ServerRpcAuditContext.java | 36 +++++-
.../server/netty/ShuffleServerNettyHandler.java | 28 +++--
9 files changed, 239 insertions(+), 241 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
b/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java
similarity index 60%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
rename to
common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java
index a66bd2805..fbe85efd0 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
+++ b/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java
@@ -15,40 +15,38 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator.audit;
+package org.apache.uniffle.common.audit;
+
+import java.io.Closeable;
import org.slf4j.Logger;
-import org.apache.uniffle.common.audit.AuditContext;
import org.apache.uniffle.common.rpc.StatusCode;
-/** An audit context for coordinator rpc. */
-public class CoordinatorRPCAuditContext implements AuditContext {
+/** Context for rpc audit logging. */
+public abstract class RpcAuditContext implements Closeable {
private final Logger log;
private String command;
private String statusCode;
- private long creationTimeNs;
- private long executionTimeNs;
- private String appId = "N/A";
- private int shuffleId = -1;
private String args;
+ private String returnValue;
+ private String from;
+ private long creationTimeNs;
+ protected long executionTimeNs;
- /**
- * Constructor of {@link CoordinatorRPCAuditContext}.
- *
- * @param log the logger to log the audit information
- */
- public CoordinatorRPCAuditContext(Logger log) {
+ public RpcAuditContext(Logger log) {
this.log = log;
}
+ protected abstract String content();
+
/**
* Sets mCommand field.
*
* @param command the command associated with shuffle server rpc
- * @return this {@link AuditContext} instance
+ * @return this {@link RpcAuditContext} instance
*/
- public CoordinatorRPCAuditContext withCommand(String command) {
+ public RpcAuditContext withCommand(String command) {
this.command = command;
return this;
}
@@ -58,9 +56,9 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
*
* @param creationTimeNs the System.nanoTime() when this operation create,
it only can be used to
* compute operation mExecutionTime
- * @return this {@link AuditContext} instance
+ * @return this {@link RpcAuditContext} instance
*/
- public CoordinatorRPCAuditContext withCreationTimeNs(long creationTimeNs) {
+ public RpcAuditContext withCreationTimeNs(long creationTimeNs) {
this.creationTimeNs = creationTimeNs;
return this;
}
@@ -69,9 +67,9 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
* Sets status code field.
*
* @param statusCode the status code
- * @return this {@link AuditContext} instance
+ * @return this {@link RpcAuditContext} instance
*/
- public CoordinatorRPCAuditContext withStatusCode(StatusCode statusCode) {
+ public RpcAuditContext withStatusCode(StatusCode statusCode) {
if (statusCode == null) {
this.statusCode = "UNKNOWN";
} else {
@@ -84,10 +82,9 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
* Sets status code field.
*
* @param statusCode the status code
- * @return this {@link AuditContext} instance
+ * @return this {@link RpcAuditContext} instance
*/
- public CoordinatorRPCAuditContext withStatusCode(
- org.apache.uniffle.proto.RssProtos.StatusCode statusCode) {
+ public RpcAuditContext
withStatusCode(org.apache.uniffle.proto.RssProtos.StatusCode statusCode) {
if (statusCode == null) {
this.statusCode = "UNKNOWN";
} else {
@@ -100,13 +97,28 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
* Sets status code field.
*
* @param statusCode the status code
- * @return this {@link AuditContext} instance
+ * @return this {@link RpcAuditContext} instance
*/
- public CoordinatorRPCAuditContext withStatusCode(String statusCode) {
+ public RpcAuditContext withStatusCode(String statusCode) {
this.statusCode = statusCode;
return this;
}
+ public RpcAuditContext withArgs(String args) {
+ this.args = args;
+ return this;
+ }
+
+ public RpcAuditContext withReturnValue(String returnValue) {
+ this.returnValue = returnValue;
+ return this;
+ }
+
+ public RpcAuditContext withFrom(String from) {
+ this.from = from;
+ return this;
+ }
+
@Override
public void close() {
if (log == null) {
@@ -120,26 +132,14 @@ public class CoordinatorRPCAuditContext implements
AuditContext {
public String toString() {
String line =
String.format(
-
"cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d\t",
- command, statusCode, appId, shuffleId, executionTimeNs / 1000);
+ "cmd=%s\tstatusCode=%s\tfrom=%s\texecutionTimeUs=%d\t%s",
+ command, statusCode, from, executionTimeNs / 1000, content());
if (args != null) {
- line += String.format("args{%s}", args);
+ line += String.format("\targs{%s}", args);
+ }
+ if (returnValue != null) {
+ line += String.format("\treturn{%s}", returnValue);
}
return line;
}
-
- public CoordinatorRPCAuditContext withAppId(String appId) {
- this.appId = appId;
- return this;
- }
-
- public CoordinatorRPCAuditContext withShuffleId(int shuffleId) {
- this.shuffleId = shuffleId;
- return this;
- }
-
- public CoordinatorRPCAuditContext withArgs(String args) {
- this.args = args;
- return this;
- }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/rpc/ClientContextServerInterceptor.java
b/common/src/main/java/org/apache/uniffle/common/rpc/ClientContextServerInterceptor.java
new file mode 100644
index 000000000..e8c064a1c
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/rpc/ClientContextServerInterceptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.rpc;
+
+import javax.annotation.Nullable;
+
+import io.grpc.ForwardingServerCallListener;
+import io.grpc.Grpc;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+
+/**
+ * Server side interceptor that is used to put remote client's IP Address to
thread local storage.
+ */
+public class ClientContextServerInterceptor implements ServerInterceptor {
+
+ /**
+ * A {@link ThreadLocal} variable to maintain the client's IP address along
with a specific
+ * thread.
+ */
+ private static final ThreadLocal<String> IP_ADDRESS_THREAD_LOCAL = new
ThreadLocal<>();
+
+ /** @return IP address of the gRPC client that is making the call */
+ @Nullable public static String getIpAddress() {
+ return IP_ADDRESS_THREAD_LOCAL.get();
+ }
+
+ @Override
+ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
+ ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT,
RespT> next) {
+ /**
+ * For streaming calls, below will make sure remote IP address and client
version are injected
+ * prior to creating the stream.
+ */
+ setRemoteIpAddress(call);
+
+ /**
+ * For non-streaming calls to server, below listener will be invoked in
the same thread that is
+ * serving the call.
+ */
+ return new
ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
+ next.startCall(call, headers)) {
+ @Override
+ public void onHalfClose() {
+ setRemoteIpAddress(call);
+ super.onHalfClose();
+ }
+ };
+ }
+
+ private <ReqT, RespT> void setRemoteIpAddress(ServerCall<ReqT, RespT> call) {
+ String remoteIpAddress =
call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString();
+ IP_ADDRESS_THREAD_LOCAL.set(remoteIpAddress);
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index cf1f05fea..0929fa248 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -118,6 +118,7 @@ public class GrpcServer implements ServerInterface {
servicesWithInterceptors.forEach(
(serviceWithInterceptors) -> {
List<ServerInterceptor> interceptors =
serviceWithInterceptors.getRight();
+ interceptors.add(new ClientContextServerInterceptor());
if (isMetricsEnabled) {
MonitoringServerInterceptor monitoringInterceptor =
new MonitoringServerInterceptor(grpcMetrics);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index d618211e0..9f7c5cdd4 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -36,11 +36,12 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ServerStatus;
-import org.apache.uniffle.common.audit.AuditContext;
+import org.apache.uniffle.common.audit.RpcAuditContext;
+import org.apache.uniffle.common.rpc.ClientContextServerInterceptor;
import org.apache.uniffle.common.storage.StorageInfoUtils;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
-import org.apache.uniffle.coordinator.audit.CoordinatorRPCAuditContext;
+import org.apache.uniffle.coordinator.audit.CoordinatorRpcAuditContext;
import org.apache.uniffle.coordinator.conf.RssClientConfFetchInfo;
import
org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
import org.apache.uniffle.coordinator.util.CoordinatorUtils;
@@ -124,7 +125,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
public void getShuffleAssignments(
GetShuffleServerRequest request,
StreamObserver<GetShuffleAssignmentsResponse> responseObserver) {
- try (CoordinatorRPCAuditContext auditContext =
createAuditContext("getShuffleAssignments")) {
+ try (CoordinatorRpcAuditContext auditContext =
createAuditContext("getShuffleAssignments")) {
final String appId = request.getApplicationId();
final int shuffleId = request.getShuffleId();
final int partitionNum = request.getPartitionNum();
@@ -135,11 +136,12 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
final Set<String> faultyServerIds = new
HashSet<>(request.getFaultyServerIdsList());
- auditContext.withAppId(appId).withShuffleId(shuffleId);
+ auditContext.withAppId(appId);
auditContext.withArgs(
String.format(
- "partitionNum=%d, partitionNumPerRange=%d, replica=%d,
requiredTags=%s, "
+ "shuffleId=%d, partitionNum=%d, partitionNumPerRange=%d,
replica=%d, requiredTags=%s, "
+ "requiredShuffleServerNumber=%d, faultyServerIds=%s,
stageId=%d, stageAttemptNumber=%d, isReassign=%b",
+ shuffleId,
partitionNum,
partitionNumPerRange,
replica,
@@ -216,7 +218,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
public void heartbeat(
ShuffleServerHeartBeatRequest request,
StreamObserver<ShuffleServerHeartBeatResponse> responseObserver) {
- try (CoordinatorRPCAuditContext auditContext =
createAuditContext("heartbeat")) {
+ try (CoordinatorRpcAuditContext auditContext =
createAuditContext("heartbeat")) {
final ServerNode serverNode = toServerNode(request);
auditContext.withArgs("serverNode=" + serverNode.getId());
coordinatorServer.getClusterManager().add(serverNode);
@@ -237,7 +239,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
@Override
public void checkServiceAvailable(
Empty request, StreamObserver<CheckServiceAvailableResponse>
responseObserver) {
- try (CoordinatorRPCAuditContext auditContext =
createAuditContext("checkServiceAvailable")) {
+ try (CoordinatorRpcAuditContext auditContext =
createAuditContext("checkServiceAvailable")) {
final CheckServiceAvailableResponse response =
CheckServiceAvailableResponse.newBuilder()
.setAvailable(coordinatorServer.getClusterManager().getNodesNum() > 0)
@@ -252,7 +254,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
public void reportClientOperation(
ReportShuffleClientOpRequest request,
StreamObserver<ReportShuffleClientOpResponse> responseObserver) {
- try (CoordinatorRPCAuditContext auditContext =
createAuditContext("reportClientOperation")) {
+ try (CoordinatorRpcAuditContext auditContext =
createAuditContext("reportClientOperation")) {
final String clientHost = request.getClientHost();
final int clientPort = request.getClientPort();
final ShuffleServerId shuffleServer = request.getServer();
@@ -275,7 +277,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
@Override
public void appHeartbeat(
AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse>
responseObserver) {
- try (CoordinatorRPCAuditContext auditContext =
createAuditContext("appHeartbeat")) {
+ try (CoordinatorRpcAuditContext auditContext =
createAuditContext("appHeartbeat")) {
String appId = request.getAppId();
auditContext.withAppId(appId);
coordinatorServer.getApplicationManager().refreshAppId(appId);
@@ -302,7 +304,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
@Override
public void registerApplicationInfo(
ApplicationInfoRequest request, StreamObserver<ApplicationInfoResponse>
responseObserver) {
- try (CoordinatorRPCAuditContext auditContext =
createAuditContext("registerApplicationInfo")) {
+ try (CoordinatorRpcAuditContext auditContext =
createAuditContext("registerApplicationInfo")) {
String appId = request.getAppId();
String user = request.getUser();
auditContext.withAppId(appId).withArgs("user=" + user);
@@ -332,7 +334,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
@Override
public void accessCluster(
AccessClusterRequest request, StreamObserver<AccessClusterResponse>
responseObserver) {
- try (CoordinatorRPCAuditContext auditContext =
createAuditContext("accessCluster")) {
+ try (CoordinatorRpcAuditContext auditContext =
createAuditContext("accessCluster")) {
StatusCode statusCode = StatusCode.SUCCESS;
AccessClusterResponse response;
AccessManager accessManager = coordinatorServer.getAccessManager();
@@ -376,7 +378,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
@Override
public void fetchClientConf(
Empty empty, StreamObserver<FetchClientConfResponse> responseObserver) {
- try (CoordinatorRPCAuditContext auditContext =
createAuditContext("fetchClientConf")) {
+ try (CoordinatorRpcAuditContext auditContext =
createAuditContext("fetchClientConf")) {
fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO,
responseObserver);
auditContext.withStatusCode(StatusCode.SUCCESS);
}
@@ -385,7 +387,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
@Override
public void fetchClientConfV2(
FetchClientConfRequest request, StreamObserver<FetchClientConfResponse>
responseObserver) {
- try (CoordinatorRPCAuditContext auditContext =
createAuditContext("fetchClientConfV2")) {
+ try (CoordinatorRpcAuditContext auditContext =
createAuditContext("fetchClientConfV2")) {
fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request),
responseObserver);
auditContext.withStatusCode(StatusCode.SUCCESS);
}
@@ -426,7 +428,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
public void fetchRemoteStorage(
FetchRemoteStorageRequest request,
StreamObserver<FetchRemoteStorageResponse> responseObserver) {
- try (CoordinatorRPCAuditContext auditContext =
createAuditContext("fetchRemoteStorage")) {
+ try (CoordinatorRpcAuditContext auditContext =
createAuditContext("fetchRemoteStorage")) {
FetchRemoteStorageResponse response;
StatusCode status = StatusCode.SUCCESS;
String appId = request.getAppId();
@@ -523,20 +525,23 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
}
/**
- * Creates a {@link CoordinatorRPCAuditContext} instance.
+ * Creates a {@link CoordinatorRpcAuditContext} instance.
*
- * @param command the command to be logged by this {@link AuditContext}
- * @return newly-created {@link CoordinatorRPCAuditContext} instance
+ * @param command the command to be logged by this {@link RpcAuditContext}
+ * @return newly-created {@link CoordinatorRpcAuditContext} instance
*/
- private CoordinatorRPCAuditContext createAuditContext(String command) {
+ private CoordinatorRpcAuditContext createAuditContext(String command) {
// Audit log may be enabled during runtime
Logger auditLogger = null;
if (isRpcAuditLogEnabled && !rpcAuditExcludeOpList.contains(command)) {
auditLogger = AUDIT_LOGGER;
}
- CoordinatorRPCAuditContext auditContext = new
CoordinatorRPCAuditContext(auditLogger);
+ CoordinatorRpcAuditContext auditContext = new
CoordinatorRpcAuditContext(auditLogger);
if (auditLogger != null) {
- auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
+ auditContext
+ .withCommand(command)
+ .withFrom(ClientContextServerInterceptor.getIpAddress())
+ .withCreationTimeNs(System.nanoTime());
}
return auditContext;
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRpcAuditContext.java
similarity index 55%
copy from common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java
copy to
coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRpcAuditContext.java
index 77fbb9374..a23ae2ba3 100644
--- a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRpcAuditContext.java
@@ -15,12 +15,32 @@
* limitations under the License.
*/
-package org.apache.uniffle.common.audit;
+package org.apache.uniffle.coordinator.audit;
-import java.io.Closeable;
+import org.slf4j.Logger;
+
+import org.apache.uniffle.common.audit.RpcAuditContext;
+
+/** An audit context for coordinator rpc. */
+public class CoordinatorRpcAuditContext extends RpcAuditContext {
+ private String appId = "N/A";
+
+ /**
+ * Constructor of {@link CoordinatorRpcAuditContext}.
+ *
+ * @param log the logger to log the audit information
+ */
+ public CoordinatorRpcAuditContext(Logger log) {
+ super(log);
+ }
-/** Context for audit logging. */
-public interface AuditContext extends Closeable {
@Override
- void close();
+ protected String content() {
+ return String.format("appId=%s", appId);
+ }
+
+ public CoordinatorRpcAuditContext withAppId(String appId) {
+ this.appId = appId;
+ return this;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index b0fe9d9ef..5cce3a3a8 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -47,13 +47,14 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
-import org.apache.uniffle.common.audit.AuditContext;
+import org.apache.uniffle.common.audit.RpcAuditContext;
import org.apache.uniffle.common.config.RssBaseConf;
import
org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.exception.NoBufferException;
import org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
import org.apache.uniffle.common.exception.NoRegisterException;
+import org.apache.uniffle.common.rpc.ClientContextServerInterceptor;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ByteBufUtils;
@@ -90,7 +91,7 @@ import
org.apache.uniffle.proto.RssProtos.ShufflePartitionRange;
import org.apache.uniffle.proto.RssProtos.ShuffleRegisterRequest;
import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase;
-import org.apache.uniffle.server.audit.ServerRPCAuditContext;
+import org.apache.uniffle.server.audit.ServerRpcAuditContext;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.common.StorageReadMetrics;
@@ -125,7 +126,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
public void unregisterShuffleByAppId(
RssProtos.ShuffleUnregisterByAppIdRequest request,
StreamObserver<RssProtos.ShuffleUnregisterByAppIdResponse>
responseStreamObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("unregisterShuffleByAppId")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("unregisterShuffleByAppId")) {
String appId = request.getAppId();
auditContext.withAppId(appId);
StatusCode status = verifyRequest(appId);
@@ -163,7 +164,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
public void unregisterShuffle(
RssProtos.ShuffleUnregisterRequest request,
StreamObserver<RssProtos.ShuffleUnregisterResponse>
responseStreamObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("unregisterShuffle")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("unregisterShuffle")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
auditContext.withAppId(appId).withShuffleId(shuffleId);
@@ -200,7 +201,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
@Override
public void registerShuffle(
ShuffleRegisterRequest req, StreamObserver<ShuffleRegisterResponse>
responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("registerShuffle")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("registerShuffle")) {
ShuffleRegisterResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
@@ -306,7 +307,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
@Override
public void sendShuffleData(
SendShuffleDataRequest req, StreamObserver<SendShuffleDataResponse>
responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("sendShuffleData")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("sendShuffleData")) {
SendShuffleDataResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
@@ -519,7 +520,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
@Override
public void commitShuffleTask(
ShuffleCommitRequest req, StreamObserver<ShuffleCommitResponse>
responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("commitShuffleTask")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("commitShuffleTask")) {
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
auditContext.withAppId(appId).withShuffleId(shuffleId);
@@ -576,7 +577,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
@Override
public void finishShuffle(
FinishShuffleRequest req, StreamObserver<FinishShuffleResponse>
responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("finishShuffle")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("finishShuffle")) {
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
auditContext.withAppId(appId).withShuffleId(shuffleId);
@@ -624,7 +625,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
@Override
public void requireBuffer(
RequireBufferRequest request, StreamObserver<RequireBufferResponse>
responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("requireBuffer")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("requireBuffer")) {
String appId = request.getAppId();
auditContext.withAppId(appId).withShuffleId(request.getShuffleId());
String auditArgs = "requireSize=" + request.getRequireSize();
@@ -704,7 +705,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
@Override
public void appHeartbeat(
AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse>
responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("appHeartbeat")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("appHeartbeat")) {
String appId = request.getAppId();
auditContext.withAppId(appId);
StatusCode status = verifyRequest(appId);
@@ -745,7 +746,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
public void reportShuffleResult(
ReportShuffleResultRequest request,
StreamObserver<ReportShuffleResultResponse> responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("reportShuffleResult")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("reportShuffleResult")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
long taskAttemptId = request.getTaskAttemptId();
@@ -825,7 +826,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
@Override
public void getShuffleResult(
GetShuffleResultRequest request,
StreamObserver<GetShuffleResultResponse> responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("getShuffleResult")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("getShuffleResult")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int partitionId = request.getPartitionId();
@@ -892,7 +893,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
public void getShuffleResultForMultiPart(
GetShuffleResultForMultiPartRequest request,
StreamObserver<GetShuffleResultForMultiPartResponse> responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("getShuffleResultForMultiPart")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("getShuffleResultForMultiPart")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
List<Integer> partitionsList = request.getPartitionsList();
@@ -962,7 +963,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
public void getLocalShuffleData(
GetLocalShuffleDataRequest request,
StreamObserver<GetLocalShuffleDataResponse> responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("getLocalShuffleData")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("getLocalShuffleData")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int partitionId = request.getPartitionId();
@@ -1108,7 +1109,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
public void getLocalShuffleIndex(
GetLocalShuffleIndexRequest request,
StreamObserver<GetLocalShuffleIndexResponse> responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("getLocalShuffleIndex")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("getLocalShuffleIndex")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int partitionId = request.getPartitionId();
@@ -1232,7 +1233,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
public void getMemoryShuffleData(
GetMemoryShuffleDataRequest request,
StreamObserver<GetMemoryShuffleDataResponse> responseObserver) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("getMemoryShuffleData")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("getMemoryShuffleData")) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int partitionId = request.getPartitionId();
@@ -1450,20 +1451,23 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
/**
- * Creates a {@link ServerRPCAuditContext} instance.
+ * Creates a {@link ServerRpcAuditContext} instance.
*
- * @param command the command to be logged by this {@link AuditContext}
- * @return newly-created {@link ServerRPCAuditContext} instance
+ * @param command the command to be logged by this {@link RpcAuditContext}
+ * @return newly-created {@link ServerRpcAuditContext} instance
*/
- private ServerRPCAuditContext createAuditContext(String command) {
+ private ServerRpcAuditContext createAuditContext(String command) {
// Audit log may be enabled during runtime
Logger auditLogger = null;
if (isRpcAuditLogEnabled && !rpcAuditExcludeOpList.contains(command)) {
auditLogger = AUDIT_LOGGER;
}
- ServerRPCAuditContext auditContext = new
ServerRPCAuditContext(auditLogger);
+ ServerRpcAuditContext auditContext = new
ServerRpcAuditContext(auditLogger);
if (auditLogger != null) {
- auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
+ auditContext
+ .withCommand(command)
+ .withFrom(ClientContextServerInterceptor.getIpAddress())
+ .withCreationTimeNs(System.nanoTime());
}
return auditContext;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
b/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
deleted file mode 100644
index b78c606a5..000000000
---
a/server/src/main/java/org/apache/uniffle/server/audit/ServerRPCAuditContext.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.server.audit;
-
-import org.slf4j.Logger;
-
-import org.apache.uniffle.common.audit.AuditContext;
-import org.apache.uniffle.common.rpc.StatusCode;
-
-/** An audit context for shuffle server rpc. */
-public class ServerRPCAuditContext implements AuditContext {
- private final Logger log;
- private String command;
- private String statusCode;
- private long creationTimeNs;
- private long executionTimeNs;
- private String appId = "N/A";
- private int shuffleId = -1;
- private String args;
- private String returnValue;
-
- /**
- * Constructor of {@link ServerRPCAuditContext}.
- *
- * @param log the logger to log the audit information
- */
- public ServerRPCAuditContext(Logger log) {
- this.log = log;
- }
-
- /**
- * Sets mCommand field.
- *
- * @param command the command associated with shuffle server rpc
- * @return this {@link AuditContext} instance
- */
- public ServerRPCAuditContext withCommand(String command) {
- this.command = command;
- return this;
- }
-
- /**
- * Sets creationTimeNs field.
- *
- * @param creationTimeNs the System.nanoTime() when this operation create,
it only can be used to
- * compute operation mExecutionTime
- * @return this {@link AuditContext} instance
- */
- public ServerRPCAuditContext withCreationTimeNs(long creationTimeNs) {
- this.creationTimeNs = creationTimeNs;
- return this;
- }
-
- /**
- * Sets status code field.
- *
- * @param statusCode the status code
- * @return this {@link AuditContext} instance
- */
- public ServerRPCAuditContext withStatusCode(StatusCode statusCode) {
- this.statusCode = statusCode.name();
- return this;
- }
-
- /**
- * Sets status code field.
- *
- * @param statusCode the status code
- * @return this {@link AuditContext} instance
- */
- public ServerRPCAuditContext withStatusCode(String statusCode) {
- this.statusCode = statusCode;
- return this;
- }
-
- @Override
- public void close() {
- if (log == null) {
- return;
- }
- executionTimeNs = System.nanoTime() - creationTimeNs;
- log.info(toString());
- }
-
- @Override
- public String toString() {
- String line =
- String.format(
-
"cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d",
- command, statusCode, appId, shuffleId, executionTimeNs / 1000);
- if (args != null) {
- line += String.format("\targs{%s}", args);
- }
- if (returnValue != null) {
- line += String.format("\treturn{%s}", returnValue);
- }
- return line;
- }
-
- public ServerRPCAuditContext withAppId(String appId) {
- this.appId = appId;
- return this;
- }
-
- public ServerRPCAuditContext withShuffleId(int shuffleId) {
- this.shuffleId = shuffleId;
- return this;
- }
-
- public ServerRPCAuditContext withArgs(String args) {
- this.args = args;
- return this;
- }
-
- public ServerRPCAuditContext withReturnValue(String returnValue) {
- this.returnValue = returnValue;
- return this;
- }
-}
diff --git
a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java
b/server/src/main/java/org/apache/uniffle/server/audit/ServerRpcAuditContext.java
similarity index 50%
rename from
common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java
rename to
server/src/main/java/org/apache/uniffle/server/audit/ServerRpcAuditContext.java
index 77fbb9374..00c98f518 100644
--- a/common/src/main/java/org/apache/uniffle/common/audit/AuditContext.java
+++
b/server/src/main/java/org/apache/uniffle/server/audit/ServerRpcAuditContext.java
@@ -15,12 +15,38 @@
* limitations under the License.
*/
-package org.apache.uniffle.common.audit;
+package org.apache.uniffle.server.audit;
-import java.io.Closeable;
+import org.slf4j.Logger;
+
+import org.apache.uniffle.common.audit.RpcAuditContext;
+
+/** An audit context for shuffle server rpc. */
+public class ServerRpcAuditContext extends RpcAuditContext {
+ private String appId = "N/A";
+ private int shuffleId = -1;
+
+ /**
+ * Constructor of {@link ServerRpcAuditContext}.
+ *
+ * @param log the logger to log the audit information
+ */
+ public ServerRpcAuditContext(Logger log) {
+ super(log);
+ }
-/** Context for audit logging. */
-public interface AuditContext extends Closeable {
@Override
- void close();
+ protected String content() {
+ return String.format("appId=%s\tshuffleId=%s", appId, shuffleId);
+ }
+
+ public ServerRpcAuditContext withAppId(String appId) {
+ this.appId = appId;
+ return this;
+ }
+
+ public ServerRpcAuditContext withShuffleId(int shuffleId) {
+ this.shuffleId = shuffleId;
+ return this;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index fdb723dfb..fb4d824d9 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -40,7 +40,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
-import org.apache.uniffle.common.audit.AuditContext;
+import org.apache.uniffle.common.audit.RpcAuditContext;
import org.apache.uniffle.common.config.RssBaseConf;
import
org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
import org.apache.uniffle.common.exception.FileNotFoundException;
@@ -65,7 +65,7 @@ import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskInfo;
import org.apache.uniffle.server.ShuffleTaskManager;
-import org.apache.uniffle.server.audit.ServerRPCAuditContext;
+import org.apache.uniffle.server.audit.ServerRpcAuditContext;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.storage.common.Storage;
@@ -121,7 +121,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
}
public void handleSendShuffleDataRequest(TransportClient client,
SendShuffleDataRequest req) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("sendShuffleData")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("sendShuffleData", client)) {
RpcResponse rpcResponse;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
@@ -386,7 +386,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
public void handleGetMemoryShuffleDataRequest(
TransportClient client, GetMemoryShuffleDataRequest req) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("getMemoryShuffleData")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("getMemoryShuffleData", client)) {
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
int partitionId = req.getPartitionId();
@@ -497,7 +497,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
public void handleGetLocalShuffleIndexRequest(
TransportClient client, GetLocalShuffleIndexRequest req) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("getLocalShuffleIndex")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("getLocalShuffleIndex", client)) {
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
int partitionId = req.getPartitionId();
@@ -607,7 +607,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
}
public void handleGetLocalShuffleData(TransportClient client,
GetLocalShuffleDataRequest req) {
- try (ServerRPCAuditContext auditContext =
createAuditContext("getLocalShuffleData")) {
+ try (ServerRpcAuditContext auditContext =
createAuditContext("getLocalShuffleData", client)) {
GetLocalShuffleDataResponse response;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
@@ -879,20 +879,24 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
}
/**
- * Creates a {@link ServerRPCAuditContext} instance.
+ * Creates a {@link ServerRpcAuditContext} instance.
*
- * @param command the command to be logged by this {@link AuditContext}
- * @return newly-created {@link ServerRPCAuditContext} instance
+ * @param command the command to be logged by this {@link RpcAuditContext}
+ * @return newly-created {@link ServerRpcAuditContext} instance
*/
- private ServerRPCAuditContext createAuditContext(String command) {
+ private ServerRpcAuditContext createAuditContext(
+ String command, TransportClient transportClient) {
// Audit log may be enabled during runtime
Logger auditLogger = null;
if (isRpcAuditLogEnabled && !rpcAuditExcludeOpList.contains(command)) {
auditLogger = AUDIT_LOGGER;
}
- ServerRPCAuditContext auditContext = new
ServerRPCAuditContext(auditLogger);
+ ServerRpcAuditContext auditContext = new
ServerRpcAuditContext(auditLogger);
if (auditLogger != null) {
- auditContext.withCommand(command).withCreationTimeNs(System.nanoTime());
+ auditContext
+ .withCommand(command)
+ .withFrom(transportClient.getSocketAddress().toString())
+ .withCreationTimeNs(System.nanoTime());
}
return auditContext;
}