This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 658ca1f79 Support coordinator RPC audit log (#1979)
658ca1f79 is described below
commit 658ca1f795c148e29098c12b77701e0da1e4206c
Author: maobaolong <[email protected]>
AuthorDate: Thu Aug 1 10:55:08 2024 +0800
Support coordinator RPC audit log (#1979)
### What changes were proposed in this pull request?
Support recording audit log for coordinator rpc operation.
### Why are the changes needed?
Fix: #1980
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Start coordinator and server, watch the new `coordinator-rpc-audit.log`
```
[2024-07-29 17:20:18.986] cmd=fetchClientConfV2 statusCode=SUCCESS
appId=N/A shuffleId=-1 executionTimeUs=6130
[2024-07-29 17:20:20.900] cmd=heartbeat statusCode=SUCCESS appId=N/A
shuffleId=-1 executionTimeUs=618 args{serverNode=XXX-19979}
[2024-07-29 17:20:51.716] cmd=getShuffleAssignments statusCode=SUCCESS
appId=app-20240729172021-0002_1722244818120 shuffleId=0
executionTimeUs=18097 args{partitionNum=2, partitionNumPerRange=1, replica=1,
requiredTags=[ss_v5, GRPC], requiredShuffleServerNumber=-1, faultyServerIds=[],
stageId=-1, stageAttemptNumber=0, isReassign=false}
[2024-07-29 17:20:51.981] cmd=registerApplicationInfo statusCode=SUCCESS
appId=app-20240729172021-0002_1722244818120 shuffleId=-1
executionTimeUs=4142 args{user=user}
[2024-07-29 18:13:44.283] cmd=appHeartbeat statusCode=SUCCESS
appId=app-20240729173334-0000_1722245612411 shuffleId=-1
executionTimeUs=339
```
---
bin/start-coordinator.sh | 3 +-
conf/local_dev/log4j2.xml | 10 +
conf/log4j2.xml | 10 +
.../uniffle/coordinator/CoordinatorConf.java | 6 +
.../coordinator/CoordinatorGrpcService.java | 455 ++++++++++++---------
.../audit/CoordinatorRPCAuditContext.java | 145 +++++++
.../uniffle/coordinator/util/CoordinatorUtils.java | 5 +-
docs/coordinator_guide.md | 55 +--
8 files changed, 477 insertions(+), 212 deletions(-)
diff --git a/bin/start-coordinator.sh b/bin/start-coordinator.sh
index 05bc4c104..2e790f8a0 100755
--- a/bin/start-coordinator.sh
+++ b/bin/start-coordinator.sh
@@ -30,6 +30,7 @@ COORDINATOR_CONF_FILE="${RSS_CONF_DIR}/coordinator.conf"
JAR_DIR="${RSS_HOME}/jars"
LOG_CONF_FILE="${RSS_CONF_DIR}/log4j2.xml"
LOG_PATH="${RSS_LOG_DIR}/coordinator.log"
+COORDINATOR_RPC_AUDIT_LOG_PATH="${RSS_LOG_DIR}/coordinator_rpc_audit.log"
MAIN_CLASS="org.apache.uniffle.coordinator.CoordinatorServer"
@@ -96,7 +97,7 @@ GC_LOG_ARGS_NEW=" -XX:+IgnoreUnrecognizedVMOptions \
JVM_LOG_ARGS=""
if [ -f ${LOG_CONF_FILE} ]; then
- JVM_LOG_ARGS=" -Dlog4j2.configurationFile=file:${LOG_CONF_FILE}
-Dlog.path=${LOG_PATH}"
+ JVM_LOG_ARGS=" -Dlog4j2.configurationFile=file:${LOG_CONF_FILE}
-Dlog.path=${LOG_PATH}
-Dcoordinator.rpc.audit.log.path=${COORDINATOR_RPC_AUDIT_LOG_PATH}"
else
echo "Exit with error: ${LOG_CONF_FILE} file doesn't exist."
exit 1
diff --git a/conf/local_dev/log4j2.xml b/conf/local_dev/log4j2.xml
index 6e6eb118c..9c062a429 100644
--- a/conf/local_dev/log4j2.xml
+++ b/conf/local_dev/log4j2.xml
@@ -41,6 +41,13 @@
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>
+ <RollingFile name="coordinatorRpcAuditAppender"
fileName="${sys:coordinator.rpc.audit.log.path}"
filePattern="${sys:coordinator.rpc.audit.log.path}.%i">
+ <PatternLayout pattern="[%d{yyyy-MM-dd HH:mm:ss.SSS}] %m%n"/>
+ <Policies>
+ <SizeBasedTriggeringPolicy size="2GB"/>
+ </Policies>
+ <DefaultRolloverStrategy max="10"/>
+ </RollingFile>
</Appenders>
<Loggers>
<Root level="info">
@@ -65,5 +72,8 @@
<Logger name="SHUFFLE_SERVER_RPC_AUDIT_LOG" level="INFO"
additivity="false">
<AppenderRef ref="shuffleServerRpcAuditAppender"/>
</Logger>
+ <Logger name="COORDINATOR_RPC_AUDIT_LOG" level="INFO" additivity="false">
+ <AppenderRef ref="coordinatorRpcAuditAppender"/>
+ </Logger>
</Loggers>
</Configuration>
diff --git a/conf/log4j2.xml b/conf/log4j2.xml
index 6e6eb118c..9c062a429 100644
--- a/conf/log4j2.xml
+++ b/conf/log4j2.xml
@@ -41,6 +41,13 @@
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>
+ <RollingFile name="coordinatorRpcAuditAppender"
fileName="${sys:coordinator.rpc.audit.log.path}"
filePattern="${sys:coordinator.rpc.audit.log.path}.%i">
+ <PatternLayout pattern="[%d{yyyy-MM-dd HH:mm:ss.SSS}] %m%n"/>
+ <Policies>
+ <SizeBasedTriggeringPolicy size="2GB"/>
+ </Policies>
+ <DefaultRolloverStrategy max="10"/>
+ </RollingFile>
</Appenders>
<Loggers>
<Root level="info">
@@ -65,5 +72,8 @@
<Logger name="SHUFFLE_SERVER_RPC_AUDIT_LOG" level="INFO"
additivity="false">
<AppenderRef ref="shuffleServerRpcAuditAppender"/>
</Logger>
+ <Logger name="COORDINATOR_RPC_AUDIT_LOG" level="INFO" additivity="false">
+ <AppenderRef ref="coordinatorRpcAuditAppender"/>
+ </Logger>
</Loggers>
</Configuration>
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index e68b4cbb0..6ed93edce 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -244,6 +244,12 @@ public class CoordinatorConf extends RssBaseConf {
.defaultValue(1000)
.withDescription(
"The max number of clients that communicating with nodes and
storing in the cache.");
+ public static final ConfigOption<Boolean> COORDINATOR_RPC_AUDIT_LOG_ENABLED =
+ ConfigOptions.key("rss.coordinator.rpc.audit.log.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "When set to true, for auditing purposes, the coordinator will
log audit records for every rpc request operation. ");
public CoordinatorConf() {}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 2b6c1c428..a4376576d 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -35,9 +35,11 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.audit.AuditContext;
import org.apache.uniffle.common.storage.StorageInfoUtils;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.audit.CoordinatorRPCAuditContext;
import org.apache.uniffle.coordinator.conf.RssClientConfFetchInfo;
import
org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
import org.apache.uniffle.coordinator.util.CoordinatorUtils;
@@ -71,11 +73,17 @@ import org.apache.uniffle.proto.RssProtos.StatusCode;
public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorServerImplBase {
private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorGrpcService.class);
+ private static final Logger AUDIT_LOGGER =
LoggerFactory.getLogger("COORDINATOR_RPC_AUDIT_LOG");
private final CoordinatorServer coordinatorServer;
+ private final boolean isRpcAuditLogEnabled;
public CoordinatorGrpcService(CoordinatorServer coordinatorServer) {
this.coordinatorServer = coordinatorServer;
+ isRpcAuditLogEnabled =
+ coordinatorServer
+ .getCoordinatorConf()
+ .getBoolean(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED);
}
@Override
@@ -106,71 +114,91 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
public void getShuffleAssignments(
GetShuffleServerRequest request,
StreamObserver<GetShuffleAssignmentsResponse> responseObserver) {
- final String appId = request.getApplicationId();
- final int shuffleId = request.getShuffleId();
- final int partitionNum = request.getPartitionNum();
- final int partitionNumPerRange = request.getPartitionNumPerRange();
- final int replica = request.getDataReplica();
- final Set<String> requiredTags =
Sets.newHashSet(request.getRequireTagsList());
- final int requiredShuffleServerNumber =
request.getAssignmentShuffleServerNumber();
- final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
- final Set<String> faultyServerIds = new
HashSet<>(request.getFaultyServerIdsList());
-
- LOG.info(
- "Request of getShuffleAssignments for appId[{}], shuffleId[{}],
partitionNum[{}],"
- + " partitionNumPerRange[{}], replica[{}], requiredTags[{}],
requiredShuffleServerNumber[{}],"
- + " faultyServerIds[{}], stageId[{}], stageAttemptNumber[{}],
isReassign[{}]",
- appId,
- shuffleId,
- partitionNum,
- partitionNumPerRange,
- replica,
- requiredTags,
- requiredShuffleServerNumber,
- faultyServerIds.size(),
- request.getStageId(),
- request.getStageAttemptNumber(),
- request.getReassign());
-
- GetShuffleAssignmentsResponse response;
- try {
- if (!coordinatorServer.getClusterManager().isReadyForServe()) {
- throw new Exception("Coordinator is out-of-service when in starting.");
- }
-
- final PartitionRangeAssignment pra =
- coordinatorServer
- .getAssignmentStrategy()
- .assign(
- partitionNum,
- partitionNumPerRange,
- replica,
- requiredTags,
- requiredShuffleServerNumber,
- estimateTaskConcurrency,
- faultyServerIds);
- response = CoordinatorUtils.toGetShuffleAssignmentsResponse(pra);
- logAssignmentResult(appId, shuffleId, pra);
- responseObserver.onNext(response);
- } catch (Exception e) {
- LOG.error(
- "Errors on getting shuffle assignments for app: {}, shuffleId: {},
partitionNum: {}, "
- + "partitionNumPerRange: {}, replica: {}, requiredTags: {}",
+ try (CoordinatorRPCAuditContext auditContext =
createAuditContext("getShuffleAssignments")) {
+ final String appId = request.getApplicationId();
+ final int shuffleId = request.getShuffleId();
+ final int partitionNum = request.getPartitionNum();
+ final int partitionNumPerRange = request.getPartitionNumPerRange();
+ final int replica = request.getDataReplica();
+ final Set<String> requiredTags =
Sets.newHashSet(request.getRequireTagsList());
+ final int requiredShuffleServerNumber =
request.getAssignmentShuffleServerNumber();
+ final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
+ final Set<String> faultyServerIds = new
HashSet<>(request.getFaultyServerIdsList());
+
+ auditContext.setAppId(appId).setShuffleId(shuffleId);
+ auditContext.setArgs(
+ String.format(
+ "partitionNum=%d, partitionNumPerRange=%d, replica=%d,
requiredTags=%s, "
+ + "requiredShuffleServerNumber=%d, faultyServerIds=%s,
stageId=%d, stageAttemptNumber=%d, isReassign=%b",
+ partitionNum,
+ partitionNumPerRange,
+ replica,
+ requiredTags,
+ requiredShuffleServerNumber,
+ faultyServerIds,
+ request.getStageId(),
+ request.getStageAttemptNumber(),
+ request.getReassign()));
+
+ LOG.info(
+ "Request of getShuffleAssignments for appId[{}], shuffleId[{}],
partitionNum[{}],"
+ + " partitionNumPerRange[{}], replica[{}], requiredTags[{}],
requiredShuffleServerNumber[{}],"
+ + " faultyServerIds[{}], stageId[{}], stageAttemptNumber[{}],
isReassign[{}]",
appId,
shuffleId,
partitionNum,
partitionNumPerRange,
replica,
requiredTags,
- e);
- response =
- GetShuffleAssignmentsResponse.newBuilder()
- .setStatus(StatusCode.INTERNAL_ERROR)
- .setRetMsg(e.getMessage())
- .build();
- responseObserver.onNext(response);
- } finally {
- responseObserver.onCompleted();
+ requiredShuffleServerNumber,
+ faultyServerIds.size(),
+ request.getStageId(),
+ request.getStageAttemptNumber(),
+ request.getReassign());
+
+ GetShuffleAssignmentsResponse response = null;
+ try {
+ if (!coordinatorServer.getClusterManager().isReadyForServe()) {
+ throw new Exception("Coordinator is out-of-service when in
starting.");
+ }
+
+ final PartitionRangeAssignment pra =
+ coordinatorServer
+ .getAssignmentStrategy()
+ .assign(
+ partitionNum,
+ partitionNumPerRange,
+ replica,
+ requiredTags,
+ requiredShuffleServerNumber,
+ estimateTaskConcurrency,
+ faultyServerIds);
+ response = CoordinatorUtils.toGetShuffleAssignmentsResponse(pra);
+ logAssignmentResult(appId, shuffleId, pra);
+ responseObserver.onNext(response);
+ } catch (Exception e) {
+ LOG.error(
+ "Errors on getting shuffle assignments for app: {}, shuffleId: {},
partitionNum: {}, "
+ + "partitionNumPerRange: {}, replica: {}, requiredTags: {}",
+ appId,
+ shuffleId,
+ partitionNum,
+ partitionNumPerRange,
+ replica,
+ requiredTags,
+ e);
+ response =
+ GetShuffleAssignmentsResponse.newBuilder()
+ .setStatus(StatusCode.INTERNAL_ERROR)
+ .setRetMsg(e.getMessage())
+ .build();
+ responseObserver.onNext(response);
+ } finally {
+ if (response != null) {
+ auditContext.setStatusCode(response.getStatus());
+ }
+ responseObserver.onCompleted();
+ }
}
}
@@ -178,141 +206,177 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
public void heartbeat(
ShuffleServerHeartBeatRequest request,
StreamObserver<ShuffleServerHeartBeatResponse> responseObserver) {
- final ServerNode serverNode = toServerNode(request);
- coordinatorServer.getClusterManager().add(serverNode);
- final ShuffleServerHeartBeatResponse response =
- ShuffleServerHeartBeatResponse.newBuilder()
- .setRetMsg("")
- .setStatus(StatusCode.SUCCESS)
- .build();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got heartbeat from {}", serverNode);
+ try (CoordinatorRPCAuditContext auditContext =
createAuditContext("heartbeat")) {
+ final ServerNode serverNode = toServerNode(request);
+ auditContext.setArgs("serverNode=" + serverNode.getId());
+ coordinatorServer.getClusterManager().add(serverNode);
+ final ShuffleServerHeartBeatResponse response =
+ ShuffleServerHeartBeatResponse.newBuilder()
+ .setRetMsg("")
+ .setStatus(StatusCode.SUCCESS)
+ .build();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got heartbeat from {}", serverNode);
+ }
+ auditContext.setStatusCode(response.getStatus());
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
}
- responseObserver.onNext(response);
- responseObserver.onCompleted();
}
@Override
public void checkServiceAvailable(
Empty request, StreamObserver<CheckServiceAvailableResponse>
responseObserver) {
- final CheckServiceAvailableResponse response =
- CheckServiceAvailableResponse.newBuilder()
- .setAvailable(coordinatorServer.getClusterManager().getNodesNum()
> 0)
- .build();
- responseObserver.onNext(response);
- responseObserver.onCompleted();
+ try (CoordinatorRPCAuditContext auditContext =
createAuditContext("checkServiceAvailable")) {
+ final CheckServiceAvailableResponse response =
+ CheckServiceAvailableResponse.newBuilder()
+
.setAvailable(coordinatorServer.getClusterManager().getNodesNum() > 0)
+ .build();
+ auditContext.setStatusCode(StatusCode.SUCCESS);
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
}
@Override
public void reportClientOperation(
ReportShuffleClientOpRequest request,
StreamObserver<ReportShuffleClientOpResponse> responseObserver) {
- final String clientHost = request.getClientHost();
- final int clientPort = request.getClientPort();
- final ShuffleServerId shuffleServer = request.getServer();
- final String operation = request.getOperation();
- LOG.info(clientHost + ":" + clientPort + "->" + operation + "->" +
shuffleServer);
- final ReportShuffleClientOpResponse response =
- ReportShuffleClientOpResponse.newBuilder()
- .setRetMsg("")
- .setStatus(StatusCode.SUCCESS)
- .build();
- responseObserver.onNext(response);
- responseObserver.onCompleted();
+ try (CoordinatorRPCAuditContext auditContext =
createAuditContext("reportClientOperation")) {
+ final String clientHost = request.getClientHost();
+ final int clientPort = request.getClientPort();
+ final ShuffleServerId shuffleServer = request.getServer();
+ final String operation = request.getOperation();
+ auditContext.setArgs(
+ String.format("%s:%s->%s->%s", clientHost, clientPort, operation,
shuffleServer));
+
+ LOG.info(clientHost + ":" + clientPort + "->" + operation + "->" +
shuffleServer);
+ final ReportShuffleClientOpResponse response =
+ ReportShuffleClientOpResponse.newBuilder()
+ .setRetMsg("")
+ .setStatus(StatusCode.SUCCESS)
+ .build();
+ auditContext.setStatusCode(response.getStatus());
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
}
@Override
public void appHeartbeat(
AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse>
responseObserver) {
- String appId = request.getAppId();
- coordinatorServer.getApplicationManager().refreshAppId(appId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got heartbeat from application: {}", appId);
- }
- AppHeartBeatResponse response =
-
AppHeartBeatResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();
+ try (CoordinatorRPCAuditContext auditContext =
createAuditContext("appHeartbeat")) {
+ String appId = request.getAppId();
+ auditContext.setAppId(appId);
+ coordinatorServer.getApplicationManager().refreshAppId(appId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got heartbeat from application: {}", appId);
+ }
+ AppHeartBeatResponse response =
+
AppHeartBeatResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();
+
+ if (Context.current().isCancelled()) {
+ responseObserver.onError(
+ Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
+ auditContext.setStatusCode("CANCELLED");
+ LOG.warn("Cancelled by client {} for after deadline.", appId);
+ return;
+ }
- if (Context.current().isCancelled()) {
- responseObserver.onError(
- Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
- LOG.warn("Cancelled by client {} for after deadline.", appId);
- return;
+ auditContext.setStatusCode(response.getStatus());
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
}
-
- responseObserver.onNext(response);
- responseObserver.onCompleted();
}
@Override
public void registerApplicationInfo(
ApplicationInfoRequest request, StreamObserver<ApplicationInfoResponse>
responseObserver) {
- String appId = request.getAppId();
- String user = request.getUser();
- coordinatorServer.getApplicationManager().registerApplicationInfo(appId,
user);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got a registered application info: {}", appId);
- }
- ApplicationInfoResponse response =
-
ApplicationInfoResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();
+ try (CoordinatorRPCAuditContext auditContext =
createAuditContext("registerApplicationInfo")) {
+ String appId = request.getAppId();
+ String user = request.getUser();
+ auditContext.setAppId(appId).setArgs("user=" + user);
+ coordinatorServer.getApplicationManager().registerApplicationInfo(appId,
user);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got a registered application info: {}", appId);
+ }
+ ApplicationInfoResponse response =
+
ApplicationInfoResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();
+
+ if (Context.current().isCancelled()) {
+ responseObserver.onError(
+ Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
+ auditContext.setStatusCode("CANCELLED");
+ LOG.warn("Cancelled by client {} for after deadline.", appId);
+ return;
+ }
- if (Context.current().isCancelled()) {
- responseObserver.onError(
- Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
- LOG.warn("Cancelled by client {} for after deadline.", appId);
- return;
+ auditContext.setStatusCode(response.getStatus());
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
}
-
- responseObserver.onNext(response);
- responseObserver.onCompleted();
}
@Override
public void accessCluster(
AccessClusterRequest request, StreamObserver<AccessClusterResponse>
responseObserver) {
- StatusCode statusCode = StatusCode.SUCCESS;
- AccessClusterResponse response;
- AccessManager accessManager = coordinatorServer.getAccessManager();
-
- AccessInfo accessInfo =
- new AccessInfo(
- request.getAccessId(),
- Sets.newHashSet(request.getTagsList()),
- request.getExtraPropertiesMap(),
- request.getUser());
- AccessCheckResult result = accessManager.handleAccessRequest(accessInfo);
- if (!result.isSuccess()) {
- statusCode = StatusCode.ACCESS_DENIED;
- }
+ try (CoordinatorRPCAuditContext auditContext =
createAuditContext("accessCluster")) {
+ StatusCode statusCode = StatusCode.SUCCESS;
+ AccessClusterResponse response;
+ AccessManager accessManager = coordinatorServer.getAccessManager();
+
+ AccessInfo accessInfo =
+ new AccessInfo(
+ request.getAccessId(),
+ Sets.newHashSet(request.getTagsList()),
+ request.getExtraPropertiesMap(),
+ request.getUser());
+
+ auditContext.setArgs("accessInfo=" + accessInfo);
+
+ AccessCheckResult result = accessManager.handleAccessRequest(accessInfo);
+ if (!result.isSuccess()) {
+ statusCode = StatusCode.ACCESS_DENIED;
+ }
- response =
- AccessClusterResponse.newBuilder()
- .setStatus(statusCode)
- .setRetMsg(result.getMsg())
- .setUuid(result.getUuid())
- .build();
+ response =
+ AccessClusterResponse.newBuilder()
+ .setStatus(statusCode)
+ .setRetMsg(result.getMsg())
+ .setUuid(result.getUuid())
+ .build();
- if (Context.current().isCancelled()) {
- responseObserver.onError(
- Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
- LOG.warn("Cancelled by client {} for after deadline.", accessInfo);
- return;
- }
+ if (Context.current().isCancelled()) {
+ responseObserver.onError(
+ Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
+ auditContext.setStatusCode("CANCELLED");
+ LOG.warn("Cancelled by client {} for after deadline.", accessInfo);
+ return;
+ }
- responseObserver.onNext(response);
- responseObserver.onCompleted();
+ auditContext.setStatusCode(response.getStatus());
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
}
/** To be compatible with the older client version. */
@Override
public void fetchClientConf(
Empty empty, StreamObserver<FetchClientConfResponse> responseObserver) {
- fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO,
responseObserver);
+ try (CoordinatorRPCAuditContext auditContext =
createAuditContext("fetchClientConf")) {
+ fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO,
responseObserver);
+ auditContext.setStatusCode(StatusCode.SUCCESS);
+ }
}
@Override
public void fetchClientConfV2(
FetchClientConfRequest request, StreamObserver<FetchClientConfResponse>
responseObserver) {
- fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request),
responseObserver);
+ try (CoordinatorRPCAuditContext auditContext =
createAuditContext("fetchClientConfV2")) {
+ fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request),
responseObserver);
+ auditContext.setStatusCode(StatusCode.SUCCESS);
+ }
}
private void fetchClientConfImpl(
@@ -350,44 +414,50 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
public void fetchRemoteStorage(
FetchRemoteStorageRequest request,
StreamObserver<FetchRemoteStorageResponse> responseObserver) {
- FetchRemoteStorageResponse response;
- StatusCode status = StatusCode.SUCCESS;
- String appId = request.getAppId();
- try {
- RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder();
- RemoteStorageInfo rsInfo =
coordinatorServer.getApplicationManager().pickRemoteStorage(appId);
- if (rsInfo == null) {
- LOG.error("Remote storage of {} do not exist.", appId);
- } else {
- rsBuilder.setPath(rsInfo.getPath());
- for (Map.Entry<String, String> entry :
rsInfo.getConfItems().entrySet()) {
- rsBuilder.addRemoteStorageConf(
- RemoteStorageConfItem.newBuilder()
- .setKey(entry.getKey())
- .setValue(entry.getValue())
- .build());
+ try (CoordinatorRPCAuditContext auditContext =
createAuditContext("fetchRemoteStorage")) {
+ FetchRemoteStorageResponse response;
+ StatusCode status = StatusCode.SUCCESS;
+ String appId = request.getAppId();
+ auditContext.setAppId(appId);
+ try {
+ RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder();
+ RemoteStorageInfo rsInfo =
+ coordinatorServer.getApplicationManager().pickRemoteStorage(appId);
+ if (rsInfo == null) {
+ LOG.error("Remote storage of {} do not exist.", appId);
+ } else {
+ rsBuilder.setPath(rsInfo.getPath());
+ for (Map.Entry<String, String> entry :
rsInfo.getConfItems().entrySet()) {
+ rsBuilder.addRemoteStorageConf(
+ RemoteStorageConfItem.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue())
+ .build());
+ }
}
+ response =
+ FetchRemoteStorageResponse.newBuilder()
+ .setStatus(status)
+ .setRemoteStorage(rsBuilder.build())
+ .build();
+ } catch (Exception e) {
+ status = StatusCode.INTERNAL_ERROR;
+ response =
FetchRemoteStorageResponse.newBuilder().setStatus(status).build();
+ LOG.error("Error happened when get remote storage for appId[{}]",
appId, e);
}
- response =
- FetchRemoteStorageResponse.newBuilder()
- .setStatus(status)
- .setRemoteStorage(rsBuilder.build())
- .build();
- } catch (Exception e) {
- status = StatusCode.INTERNAL_ERROR;
- response =
FetchRemoteStorageResponse.newBuilder().setStatus(status).build();
- LOG.error("Error happened when get remote storage for appId[{}]", appId,
e);
- }
- if (Context.current().isCancelled()) {
- responseObserver.onError(
- Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
- LOG.warn("Fetch client conf cancelled by client for after deadline.");
- return;
- }
+ if (Context.current().isCancelled()) {
+ responseObserver.onError(
+ Status.CANCELLED.withDescription("Cancelled by
client").asRuntimeException());
+ auditContext.setStatusCode("CANCELLED");
+ LOG.warn("Fetch client conf cancelled by client for after deadline.");
+ return;
+ }
- responseObserver.onNext(response);
- responseObserver.onCompleted();
+ auditContext.setStatusCode(response.getStatus());
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
}
private void logAssignmentResult(String appId, int shuffleId,
PartitionRangeAssignment pra) {
@@ -437,4 +507,23 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
request.getServerId().getJettyPort(),
request.getStartTimeMs());
}
+
+ /**
+ * Creates a {@link CoordinatorRPCAuditContext} instance.
+ *
+ * @param command the command to be logged by this {@link AuditContext}
+ * @return newly-created {@link CoordinatorRPCAuditContext} instance
+ */
+ private CoordinatorRPCAuditContext createAuditContext(String command) {
+ // Audit log may be enabled during runtime
+ Logger auditLogger = null;
+ if (isRpcAuditLogEnabled) {
+ auditLogger = AUDIT_LOGGER;
+ }
+ CoordinatorRPCAuditContext auditContext = new
CoordinatorRPCAuditContext(auditLogger);
+ if (auditLogger != null) {
+ auditContext.setCommand(command).setCreationTimeNs(System.nanoTime());
+ }
+ return auditContext;
+ }
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
new file mode 100644
index 000000000..7a6453bfb
--- /dev/null
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.audit;
+
+import org.slf4j.Logger;
+
+import org.apache.uniffle.common.audit.AuditContext;
+import org.apache.uniffle.common.rpc.StatusCode;
+
+/** An audit context for coordinator rpc. */
+public class CoordinatorRPCAuditContext implements AuditContext {
+ private final Logger log;
+ private String command;
+ private String statusCode;
+ private long creationTimeNs;
+ private long executionTimeNs;
+ private String appId = "N/A";
+ private int shuffleId = -1;
+ private String args;
+
+ /**
+ * Constructor of {@link CoordinatorRPCAuditContext}.
+ *
+ * @param log the logger to log the audit information
+ */
+ public CoordinatorRPCAuditContext(Logger log) {
+ this.log = log;
+ }
+
+ /**
+ * Sets mCommand field.
+ *
+ * @param command the command associated with shuffle server rpc
+ * @return this {@link AuditContext} instance
+ */
+ public CoordinatorRPCAuditContext setCommand(String command) {
+ this.command = command;
+ return this;
+ }
+
+ /**
+ * Sets creationTimeNs field.
+ *
+ * @param creationTimeNs the System.nanoTime() when this operation create,
it only can be used to
+ * compute operation mExecutionTime
+ * @return this {@link AuditContext} instance
+ */
+ public CoordinatorRPCAuditContext setCreationTimeNs(long creationTimeNs) {
+ this.creationTimeNs = creationTimeNs;
+ return this;
+ }
+
+ /**
+ * Sets status code field.
+ *
+ * @param statusCode the status code
+ * @return this {@link AuditContext} instance
+ */
+ public CoordinatorRPCAuditContext setStatusCode(StatusCode statusCode) {
+ if (statusCode == null) {
+ this.statusCode = "UNKNOWN";
+ } else {
+ this.statusCode = statusCode.name();
+ }
+ return this;
+ }
+
+ /**
+ * Sets status code field.
+ *
+ * @param statusCode the status code
+ * @return this {@link AuditContext} instance
+ */
+ public CoordinatorRPCAuditContext setStatusCode(
+ org.apache.uniffle.proto.RssProtos.StatusCode statusCode) {
+ if (statusCode == null) {
+ this.statusCode = "UNKNOWN";
+ } else {
+ this.statusCode = statusCode.name();
+ }
+ return this;
+ }
+
+ /**
+ * Sets status code field.
+ *
+ * @param statusCode the status code
+ * @return this {@link AuditContext} instance
+ */
+ public CoordinatorRPCAuditContext setStatusCode(String statusCode) {
+ this.statusCode = statusCode;
+ return this;
+ }
+
+ @Override
+ public void close() {
+ if (log == null) {
+ return;
+ }
+ executionTimeNs = System.nanoTime() - creationTimeNs;
+ log.info(toString());
+ }
+
+ @Override
+ public String toString() {
+ String line =
+ String.format(
+
"cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d\t",
+ command, statusCode, appId, shuffleId, executionTimeNs / 1000);
+ if (args != null) {
+ line += String.format("args{%s}", args);
+ }
+ return line;
+ }
+
+ public CoordinatorRPCAuditContext setAppId(String appId) {
+ this.appId = appId;
+ return this;
+ }
+
+ public CoordinatorRPCAuditContext setShuffleId(int shuffleId) {
+ this.shuffleId = shuffleId;
+ return this;
+ }
+
+ public CoordinatorRPCAuditContext setArgs(String args) {
+ this.args = args;
+ return this;
+ }
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
index 453319320..c0c5aceb5 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
@@ -47,7 +47,10 @@ public class CoordinatorUtils {
PartitionRangeAssignment pra) {
List<RssProtos.PartitionRangeAssignment> praList =
pra.convertToGrpcProto();
- return
GetShuffleAssignmentsResponse.newBuilder().addAllAssignments(praList).build();
+ return GetShuffleAssignmentsResponse.newBuilder()
+ .addAllAssignments(praList)
+ .setStatus(RssProtos.StatusCode.SUCCESS)
+ .build();
}
public static int nextIdx(int idx, int size) {
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index e443494c1..464ce2934 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -82,33 +82,34 @@ This document will introduce how to deploy Uniffle
coordinators.
## Configuration
### Common settings
-|Property Name|Default| Description
|
-|---|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-|rss.coordinator.server.heartbeat.timeout|30000| Timeout if can't get
heartbeat from shuffle server
|
-|rss.coordinator.server.periodic.output.interval.times|30| The periodic
interval times of output alive nodes. The interval sec can be calculated by
(rss.coordinator.server.heartbeat.timeout/3 *
rss.coordinator.server.periodic.output.interval.times). Default output interval
is 5min. |
-|rss.coordinator.assignment.strategy|PARTITION_BALANCE| Strategy for assigning
shuffle server, PARTITION_BALANCE should be used for workload balance
|
-|rss.coordinator.app.expired|60000| Application expired time (ms), the
heartbeat interval should be less than it
|
-|rss.coordinator.shuffle.nodes.max|9| The max number of shuffle server when do
the assignment
|
-|rss.coordinator.dynamicClientConf.path|-| The path of configuration file
which have default conf for rss client
|
-|rss.coordinator.exclude.nodes.file.path|-| The path of configuration file
which have exclude nodes
|
-|rss.coordinator.exclude.nodes.check.interval.ms|60000| Update interval (ms)
for exclude nodes
|
-|rss.coordinator.access.checkers|org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker|
The access checkers will be used when the spark client use the
DelegationShuffleManager, which will decide whether to use rss according to the
result of the specified access checkers
|
-|rss.coordinator.access.loadChecker.memory.percentage|15.0| The minimal
percentage of available memory percentage of a server
|
-|rss.coordinator.dynamicClientConf.enabled|false| whether to enable dynamic
client conf, which will be fetched by spark client
|
-|rss.coordinator.dynamicClientConf.path|-| The dynamic client conf of this
cluster and can be stored in HADOOP FS or local
|
-|rss.coordinator.dynamicClientConf.updateIntervalSec|120| The dynamic client
conf update interval in seconds
|
-|rss.coordinator.remote.storage.cluster.conf|-| Remote Storage Cluster related
conf with format $clusterId,$key=$value, separated by ';'
|
-|rss.rpc.server.port|-| RPC port for coordinator
|
-|rss.jetty.http.port|-| Http port for coordinator
|
-|rss.coordinator.remote.storage.select.strategy|APP_BALANCE| Strategy for
selecting the remote path
|
-|rss.coordinator.remote.storage.io.sample.schedule.time|60000| The time of
scheduling the read and write time of the paths to obtain different HADOOP FS
|
-|rss.coordinator.remote.storage.io.sample.file.size|204800000| The size of the
file that the scheduled thread reads and writes
|
-|rss.coordinator.remote.storage.io.sample.access.times|3| The number of times
to read and write HADOOP FS files
|
-|rss.coordinator.startup-silent-period.enabled|false| Enable the
startup-silent-period to reject the assignment requests for avoiding partial
assignments. To avoid service interruption, this mechanism is disabled by
default. Especially it's recommended to use in coordinator HA mode when
restarting single coordinator. |
-|rss.coordinator.startup-silent-period.duration|20000| The waiting
duration(ms) when conf of rss.coordinator.startup-silent-period.enabled is
enabled.
|
-|rss.coordinator.select.partition.strategy|CONTINUOUS| There are two
strategies for selecting partitions: ROUND and CONTINUOUS. ROUND will poll to
allocate partitions to ShuffleServer, and CONTINUOUS will try to allocate
consecutive partitions to ShuffleServer, this feature can improve performance
in AQE scenarios. |
-|rss.metrics.reporter.class|-| The class of metrics reporter.
|
-|rss.reconfigure.interval.sec|5| Reconfigure check interval.
|
+| Property Name | Default
| Description
|
+|--------------------------------------------------------|------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| rss.coordinator.server.heartbeat.timeout | 30000
| Timeout if can't get
heartbeat from shuffle server
|
+| rss.coordinator.server.periodic.output.interval.times | 30
| The periodic interval
times of output alive nodes. The interval sec can be calculated by
(rss.coordinator.server.heartbeat.timeout/3 *
rss.coordinator.server.periodic.output.interval.times). Default output interval
is 5min. |
+| rss.coordinator.assignment.strategy | PARTITION_BALANCE
| Strategy for assigning
shuffle server, PARTITION_BALANCE should be used for workload balance
|
+| rss.coordinator.app.expired | 60000
| Application expired time
(ms), the heartbeat interval should be less than it
|
+| rss.coordinator.shuffle.nodes.max | 9
| The max number of shuffle
server when do the assignment
|
+| rss.coordinator.dynamicClientConf.path | -
| The path of configuration
file which have default conf for rss client
|
+| rss.coordinator.exclude.nodes.file.path | -
| The path of configuration
file which have exclude nodes
|
+| rss.coordinator.exclude.nodes.check.interval.ms | 60000
| Update interval (ms) for
exclude nodes
|
+| rss.coordinator.access.checkers |
org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker | The
access checkers will be used when the spark client use the
DelegationShuffleManager, which will decide whether to use rss according to the
result of the specified access checkers
|
+| rss.coordinator.access.loadChecker.memory.percentage | 15.0
| The minimal percentage of
available memory percentage of a server
|
+| rss.coordinator.dynamicClientConf.enabled | false
| whether to enable dynamic
client conf, which will be fetched by spark client
|
+| rss.coordinator.dynamicClientConf.path | -
| The dynamic client conf of
this cluster and can be stored in HADOOP FS or local
|
+| rss.coordinator.dynamicClientConf.updateIntervalSec | 120
| The dynamic client conf
update interval in seconds
|
+| rss.coordinator.remote.storage.cluster.conf | -
| Remote Storage Cluster
related conf with format $clusterId,$key=$value, separated by ';'
|
+| rss.rpc.server.port | -
| RPC port for coordinator
|
+| rss.jetty.http.port | -
| Http port for coordinator
|
+| rss.coordinator.remote.storage.select.strategy | APP_BALANCE
| Strategy for selecting the
remote path
|
+| rss.coordinator.remote.storage.io.sample.schedule.time | 60000
| The time of scheduling the
read and write time of the paths to obtain different HADOOP FS
|
+| rss.coordinator.remote.storage.io.sample.file.size | 204800000
| The size of the file that
the scheduled thread reads and writes
|
+| rss.coordinator.remote.storage.io.sample.access.times | 3
| The number of times to
read and write HADOOP FS files
|
+| rss.coordinator.startup-silent-period.enabled | false
| Enable the
startup-silent-period to reject the assignment requests for avoiding partial
assignments. To avoid service interruption, this mechanism is disabled by
default. Especially it's recommended to use in coordinator HA mode when
restarting single coordinator. |
+| rss.coordinator.startup-silent-period.duration | 20000
| The waiting duration(ms)
when conf of rss.coordinator.startup-silent-period.enabled is enabled.
|
+| rss.coordinator.select.partition.strategy | CONTINUOUS
| There are two strategies
for selecting partitions: ROUND and CONTINUOUS. ROUND will poll to allocate
partitions to ShuffleServer, and CONTINUOUS will try to allocate consecutive
partitions to ShuffleServer, this feature can improve performance in AQE
scenarios. |
+| rss.metrics.reporter.class | -
| The class of metrics
reporter.
|
+| rss.reconfigure.interval.sec | 5
| Reconfigure check
interval.
|
+| rss.coordinator.rpc.audit.log.enabled | true
| When set to true, for
auditing purposes, the coordinator will log audit records for every rpc request
operation.
|
### AccessClusterLoadChecker settings
|Property Name|Default| Description|