This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 658ca1f79 Support coordinator RPC audit log (#1979)
658ca1f79 is described below

commit 658ca1f795c148e29098c12b77701e0da1e4206c
Author: maobaolong <[email protected]>
AuthorDate: Thu Aug 1 10:55:08 2024 +0800

    Support coordinator RPC audit log (#1979)
    
    ### What changes were proposed in this pull request?
    
    Support recording audit log for coordinator rpc operation.
    
    ### Why are the changes needed?
    
    Fix: #1980
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Start coordinator and server, watch the new `coordinator-rpc-audit.log`
    
    
    ```
    [2024-07-29 17:20:18.986] cmd=fetchClientConfV2 statusCode=SUCCESS      
appId=N/A       shuffleId=-1    executionTimeUs=6130
    [2024-07-29 17:20:20.900] cmd=heartbeat statusCode=SUCCESS      appId=N/A   
    shuffleId=-1    executionTimeUs=618     args{serverNode=XXX-19979}
    [2024-07-29 17:20:51.716] cmd=getShuffleAssignments     statusCode=SUCCESS  
    appId=app-20240729172021-0002_1722244818120     shuffleId=0     
executionTimeUs=18097   args{partitionNum=2, partitionNumPerRange=1, replica=1, 
requiredTags=[ss_v5, GRPC], requiredShuffleServerNumber=-1, faultyServerIds=[], 
stageId=-1, stageAttemptNumber=0, isReassign=false}
    [2024-07-29 17:20:51.981] cmd=registerApplicationInfo   statusCode=SUCCESS  
    appId=app-20240729172021-0002_1722244818120     shuffleId=-1    
executionTimeUs=4142    args{user=user}
    [2024-07-29 18:13:44.283] cmd=appHeartbeat      statusCode=SUCCESS      
appId=app-20240729173334-0000_1722245612411     shuffleId=-1    
executionTimeUs=339
    
    ```
---
 bin/start-coordinator.sh                           |   3 +-
 conf/local_dev/log4j2.xml                          |  10 +
 conf/log4j2.xml                                    |  10 +
 .../uniffle/coordinator/CoordinatorConf.java       |   6 +
 .../coordinator/CoordinatorGrpcService.java        | 455 ++++++++++++---------
 .../audit/CoordinatorRPCAuditContext.java          | 145 +++++++
 .../uniffle/coordinator/util/CoordinatorUtils.java |   5 +-
 docs/coordinator_guide.md                          |  55 +--
 8 files changed, 477 insertions(+), 212 deletions(-)

diff --git a/bin/start-coordinator.sh b/bin/start-coordinator.sh
index 05bc4c104..2e790f8a0 100755
--- a/bin/start-coordinator.sh
+++ b/bin/start-coordinator.sh
@@ -30,6 +30,7 @@ COORDINATOR_CONF_FILE="${RSS_CONF_DIR}/coordinator.conf"
 JAR_DIR="${RSS_HOME}/jars"
 LOG_CONF_FILE="${RSS_CONF_DIR}/log4j2.xml"
 LOG_PATH="${RSS_LOG_DIR}/coordinator.log"
+COORDINATOR_RPC_AUDIT_LOG_PATH="${RSS_LOG_DIR}/coordinator_rpc_audit.log"
 
 MAIN_CLASS="org.apache.uniffle.coordinator.CoordinatorServer"
 
@@ -96,7 +97,7 @@ GC_LOG_ARGS_NEW=" -XX:+IgnoreUnrecognizedVMOptions \
 JVM_LOG_ARGS=""
 
 if [ -f ${LOG_CONF_FILE} ]; then
-  JVM_LOG_ARGS=" -Dlog4j2.configurationFile=file:${LOG_CONF_FILE} 
-Dlog.path=${LOG_PATH}"
+  JVM_LOG_ARGS=" -Dlog4j2.configurationFile=file:${LOG_CONF_FILE} 
-Dlog.path=${LOG_PATH} 
-Dcoordinator.rpc.audit.log.path=${COORDINATOR_RPC_AUDIT_LOG_PATH}"
 else
   echo "Exit with error: ${LOG_CONF_FILE} file doesn't exist."
   exit 1
diff --git a/conf/local_dev/log4j2.xml b/conf/local_dev/log4j2.xml
index 6e6eb118c..9c062a429 100644
--- a/conf/local_dev/log4j2.xml
+++ b/conf/local_dev/log4j2.xml
@@ -41,6 +41,13 @@
       </Policies>
       <DefaultRolloverStrategy max="10"/>
     </RollingFile>
+    <RollingFile name="coordinatorRpcAuditAppender" 
fileName="${sys:coordinator.rpc.audit.log.path}" 
filePattern="${sys:coordinator.rpc.audit.log.path}.%i">
+      <PatternLayout pattern="[%d{yyyy-MM-dd HH:mm:ss.SSS}] %m%n"/>
+      <Policies>
+        <SizeBasedTriggeringPolicy size="2GB"/>
+      </Policies>
+      <DefaultRolloverStrategy max="10"/>
+    </RollingFile>
   </Appenders>
   <Loggers>
     <Root level="info">
@@ -65,5 +72,8 @@
     <Logger name="SHUFFLE_SERVER_RPC_AUDIT_LOG" level="INFO" 
additivity="false">
       <AppenderRef ref="shuffleServerRpcAuditAppender"/>
     </Logger>
+    <Logger name="COORDINATOR_RPC_AUDIT_LOG" level="INFO" additivity="false">
+      <AppenderRef ref="coordinatorRpcAuditAppender"/>
+    </Logger>
   </Loggers>
 </Configuration>
diff --git a/conf/log4j2.xml b/conf/log4j2.xml
index 6e6eb118c..9c062a429 100644
--- a/conf/log4j2.xml
+++ b/conf/log4j2.xml
@@ -41,6 +41,13 @@
       </Policies>
       <DefaultRolloverStrategy max="10"/>
     </RollingFile>
+    <RollingFile name="coordinatorRpcAuditAppender" 
fileName="${sys:coordinator.rpc.audit.log.path}" 
filePattern="${sys:coordinator.rpc.audit.log.path}.%i">
+      <PatternLayout pattern="[%d{yyyy-MM-dd HH:mm:ss.SSS}] %m%n"/>
+      <Policies>
+        <SizeBasedTriggeringPolicy size="2GB"/>
+      </Policies>
+      <DefaultRolloverStrategy max="10"/>
+    </RollingFile>
   </Appenders>
   <Loggers>
     <Root level="info">
@@ -65,5 +72,8 @@
     <Logger name="SHUFFLE_SERVER_RPC_AUDIT_LOG" level="INFO" 
additivity="false">
       <AppenderRef ref="shuffleServerRpcAuditAppender"/>
     </Logger>
+    <Logger name="COORDINATOR_RPC_AUDIT_LOG" level="INFO" additivity="false">
+      <AppenderRef ref="coordinatorRpcAuditAppender"/>
+    </Logger>
   </Loggers>
 </Configuration>
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index e68b4cbb0..6ed93edce 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -244,6 +244,12 @@ public class CoordinatorConf extends RssBaseConf {
           .defaultValue(1000)
           .withDescription(
               "The max number of clients that communicating with nodes and 
storing in the cache.");
+  public static final ConfigOption<Boolean> COORDINATOR_RPC_AUDIT_LOG_ENABLED =
+      ConfigOptions.key("rss.coordinator.rpc.audit.log.enabled")
+          .booleanType()
+          .defaultValue(true)
+          .withDescription(
+              "When set to true, for auditing purposes, the coordinator will 
log audit records for every rpc request operation. ");
 
   public CoordinatorConf() {}
 
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 2b6c1c428..a4376576d 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -35,9 +35,11 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.audit.AuditContext;
 import org.apache.uniffle.common.storage.StorageInfoUtils;
 import org.apache.uniffle.coordinator.access.AccessCheckResult;
 import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.audit.CoordinatorRPCAuditContext;
 import org.apache.uniffle.coordinator.conf.RssClientConfFetchInfo;
 import 
org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
 import org.apache.uniffle.coordinator.util.CoordinatorUtils;
@@ -71,11 +73,17 @@ import org.apache.uniffle.proto.RssProtos.StatusCode;
 public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorServerImplBase {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorGrpcService.class);
+  private static final Logger AUDIT_LOGGER = 
LoggerFactory.getLogger("COORDINATOR_RPC_AUDIT_LOG");
 
   private final CoordinatorServer coordinatorServer;
+  private final boolean isRpcAuditLogEnabled;
 
   public CoordinatorGrpcService(CoordinatorServer coordinatorServer) {
     this.coordinatorServer = coordinatorServer;
+    isRpcAuditLogEnabled =
+        coordinatorServer
+            .getCoordinatorConf()
+            .getBoolean(CoordinatorConf.COORDINATOR_RPC_AUDIT_LOG_ENABLED);
   }
 
   @Override
@@ -106,71 +114,91 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   public void getShuffleAssignments(
       GetShuffleServerRequest request,
       StreamObserver<GetShuffleAssignmentsResponse> responseObserver) {
-    final String appId = request.getApplicationId();
-    final int shuffleId = request.getShuffleId();
-    final int partitionNum = request.getPartitionNum();
-    final int partitionNumPerRange = request.getPartitionNumPerRange();
-    final int replica = request.getDataReplica();
-    final Set<String> requiredTags = 
Sets.newHashSet(request.getRequireTagsList());
-    final int requiredShuffleServerNumber = 
request.getAssignmentShuffleServerNumber();
-    final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
-    final Set<String> faultyServerIds = new 
HashSet<>(request.getFaultyServerIdsList());
-
-    LOG.info(
-        "Request of getShuffleAssignments for appId[{}], shuffleId[{}], 
partitionNum[{}],"
-            + " partitionNumPerRange[{}], replica[{}], requiredTags[{}], 
requiredShuffleServerNumber[{}],"
-            + " faultyServerIds[{}], stageId[{}], stageAttemptNumber[{}], 
isReassign[{}]",
-        appId,
-        shuffleId,
-        partitionNum,
-        partitionNumPerRange,
-        replica,
-        requiredTags,
-        requiredShuffleServerNumber,
-        faultyServerIds.size(),
-        request.getStageId(),
-        request.getStageAttemptNumber(),
-        request.getReassign());
-
-    GetShuffleAssignmentsResponse response;
-    try {
-      if (!coordinatorServer.getClusterManager().isReadyForServe()) {
-        throw new Exception("Coordinator is out-of-service when in starting.");
-      }
-
-      final PartitionRangeAssignment pra =
-          coordinatorServer
-              .getAssignmentStrategy()
-              .assign(
-                  partitionNum,
-                  partitionNumPerRange,
-                  replica,
-                  requiredTags,
-                  requiredShuffleServerNumber,
-                  estimateTaskConcurrency,
-                  faultyServerIds);
-      response = CoordinatorUtils.toGetShuffleAssignmentsResponse(pra);
-      logAssignmentResult(appId, shuffleId, pra);
-      responseObserver.onNext(response);
-    } catch (Exception e) {
-      LOG.error(
-          "Errors on getting shuffle assignments for app: {}, shuffleId: {}, 
partitionNum: {}, "
-              + "partitionNumPerRange: {}, replica: {}, requiredTags: {}",
+    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("getShuffleAssignments")) {
+      final String appId = request.getApplicationId();
+      final int shuffleId = request.getShuffleId();
+      final int partitionNum = request.getPartitionNum();
+      final int partitionNumPerRange = request.getPartitionNumPerRange();
+      final int replica = request.getDataReplica();
+      final Set<String> requiredTags = 
Sets.newHashSet(request.getRequireTagsList());
+      final int requiredShuffleServerNumber = 
request.getAssignmentShuffleServerNumber();
+      final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
+      final Set<String> faultyServerIds = new 
HashSet<>(request.getFaultyServerIdsList());
+
+      auditContext.setAppId(appId).setShuffleId(shuffleId);
+      auditContext.setArgs(
+          String.format(
+              "partitionNum=%d, partitionNumPerRange=%d, replica=%d, 
requiredTags=%s, "
+                  + "requiredShuffleServerNumber=%d, faultyServerIds=%s, 
stageId=%d, stageAttemptNumber=%d, isReassign=%b",
+              partitionNum,
+              partitionNumPerRange,
+              replica,
+              requiredTags,
+              requiredShuffleServerNumber,
+              faultyServerIds,
+              request.getStageId(),
+              request.getStageAttemptNumber(),
+              request.getReassign()));
+
+      LOG.info(
+          "Request of getShuffleAssignments for appId[{}], shuffleId[{}], 
partitionNum[{}],"
+              + " partitionNumPerRange[{}], replica[{}], requiredTags[{}], 
requiredShuffleServerNumber[{}],"
+              + " faultyServerIds[{}], stageId[{}], stageAttemptNumber[{}], 
isReassign[{}]",
           appId,
           shuffleId,
           partitionNum,
           partitionNumPerRange,
           replica,
           requiredTags,
-          e);
-      response =
-          GetShuffleAssignmentsResponse.newBuilder()
-              .setStatus(StatusCode.INTERNAL_ERROR)
-              .setRetMsg(e.getMessage())
-              .build();
-      responseObserver.onNext(response);
-    } finally {
-      responseObserver.onCompleted();
+          requiredShuffleServerNumber,
+          faultyServerIds.size(),
+          request.getStageId(),
+          request.getStageAttemptNumber(),
+          request.getReassign());
+
+      GetShuffleAssignmentsResponse response = null;
+      try {
+        if (!coordinatorServer.getClusterManager().isReadyForServe()) {
+          throw new Exception("Coordinator is out-of-service when in 
starting.");
+        }
+
+        final PartitionRangeAssignment pra =
+            coordinatorServer
+                .getAssignmentStrategy()
+                .assign(
+                    partitionNum,
+                    partitionNumPerRange,
+                    replica,
+                    requiredTags,
+                    requiredShuffleServerNumber,
+                    estimateTaskConcurrency,
+                    faultyServerIds);
+        response = CoordinatorUtils.toGetShuffleAssignmentsResponse(pra);
+        logAssignmentResult(appId, shuffleId, pra);
+        responseObserver.onNext(response);
+      } catch (Exception e) {
+        LOG.error(
+            "Errors on getting shuffle assignments for app: {}, shuffleId: {}, 
partitionNum: {}, "
+                + "partitionNumPerRange: {}, replica: {}, requiredTags: {}",
+            appId,
+            shuffleId,
+            partitionNum,
+            partitionNumPerRange,
+            replica,
+            requiredTags,
+            e);
+        response =
+            GetShuffleAssignmentsResponse.newBuilder()
+                .setStatus(StatusCode.INTERNAL_ERROR)
+                .setRetMsg(e.getMessage())
+                .build();
+        responseObserver.onNext(response);
+      } finally {
+        if (response != null) {
+          auditContext.setStatusCode(response.getStatus());
+        }
+        responseObserver.onCompleted();
+      }
     }
   }
 
@@ -178,141 +206,177 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   public void heartbeat(
       ShuffleServerHeartBeatRequest request,
       StreamObserver<ShuffleServerHeartBeatResponse> responseObserver) {
-    final ServerNode serverNode = toServerNode(request);
-    coordinatorServer.getClusterManager().add(serverNode);
-    final ShuffleServerHeartBeatResponse response =
-        ShuffleServerHeartBeatResponse.newBuilder()
-            .setRetMsg("")
-            .setStatus(StatusCode.SUCCESS)
-            .build();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Got heartbeat from {}", serverNode);
+    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("heartbeat")) {
+      final ServerNode serverNode = toServerNode(request);
+      auditContext.setArgs("serverNode=" + serverNode.getId());
+      coordinatorServer.getClusterManager().add(serverNode);
+      final ShuffleServerHeartBeatResponse response =
+          ShuffleServerHeartBeatResponse.newBuilder()
+              .setRetMsg("")
+              .setStatus(StatusCode.SUCCESS)
+              .build();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got heartbeat from {}", serverNode);
+      }
+      auditContext.setStatusCode(response.getStatus());
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
     }
-    responseObserver.onNext(response);
-    responseObserver.onCompleted();
   }
 
   @Override
   public void checkServiceAvailable(
       Empty request, StreamObserver<CheckServiceAvailableResponse> 
responseObserver) {
-    final CheckServiceAvailableResponse response =
-        CheckServiceAvailableResponse.newBuilder()
-            .setAvailable(coordinatorServer.getClusterManager().getNodesNum() 
> 0)
-            .build();
-    responseObserver.onNext(response);
-    responseObserver.onCompleted();
+    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("checkServiceAvailable")) {
+      final CheckServiceAvailableResponse response =
+          CheckServiceAvailableResponse.newBuilder()
+              
.setAvailable(coordinatorServer.getClusterManager().getNodesNum() > 0)
+              .build();
+      auditContext.setStatusCode(StatusCode.SUCCESS);
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    }
   }
 
   @Override
   public void reportClientOperation(
       ReportShuffleClientOpRequest request,
       StreamObserver<ReportShuffleClientOpResponse> responseObserver) {
-    final String clientHost = request.getClientHost();
-    final int clientPort = request.getClientPort();
-    final ShuffleServerId shuffleServer = request.getServer();
-    final String operation = request.getOperation();
-    LOG.info(clientHost + ":" + clientPort + "->" + operation + "->" + 
shuffleServer);
-    final ReportShuffleClientOpResponse response =
-        ReportShuffleClientOpResponse.newBuilder()
-            .setRetMsg("")
-            .setStatus(StatusCode.SUCCESS)
-            .build();
-    responseObserver.onNext(response);
-    responseObserver.onCompleted();
+    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("reportClientOperation")) {
+      final String clientHost = request.getClientHost();
+      final int clientPort = request.getClientPort();
+      final ShuffleServerId shuffleServer = request.getServer();
+      final String operation = request.getOperation();
+      auditContext.setArgs(
+          String.format("%s:%s->%s->%s", clientHost, clientPort, operation, 
shuffleServer));
+
+      LOG.info(clientHost + ":" + clientPort + "->" + operation + "->" + 
shuffleServer);
+      final ReportShuffleClientOpResponse response =
+          ReportShuffleClientOpResponse.newBuilder()
+              .setRetMsg("")
+              .setStatus(StatusCode.SUCCESS)
+              .build();
+      auditContext.setStatusCode(response.getStatus());
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    }
   }
 
   @Override
   public void appHeartbeat(
       AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse> 
responseObserver) {
-    String appId = request.getAppId();
-    coordinatorServer.getApplicationManager().refreshAppId(appId);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Got heartbeat from application: {}", appId);
-    }
-    AppHeartBeatResponse response =
-        
AppHeartBeatResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();
+    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("appHeartbeat")) {
+      String appId = request.getAppId();
+      auditContext.setAppId(appId);
+      coordinatorServer.getApplicationManager().refreshAppId(appId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got heartbeat from application: {}", appId);
+      }
+      AppHeartBeatResponse response =
+          
AppHeartBeatResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();
+
+      if (Context.current().isCancelled()) {
+        responseObserver.onError(
+            Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
+        auditContext.setStatusCode("CANCELLED");
+        LOG.warn("Cancelled by client {} for after deadline.", appId);
+        return;
+      }
 
-    if (Context.current().isCancelled()) {
-      responseObserver.onError(
-          Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
-      LOG.warn("Cancelled by client {} for after deadline.", appId);
-      return;
+      auditContext.setStatusCode(response.getStatus());
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
     }
-
-    responseObserver.onNext(response);
-    responseObserver.onCompleted();
   }
 
   @Override
   public void registerApplicationInfo(
       ApplicationInfoRequest request, StreamObserver<ApplicationInfoResponse> 
responseObserver) {
-    String appId = request.getAppId();
-    String user = request.getUser();
-    coordinatorServer.getApplicationManager().registerApplicationInfo(appId, 
user);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Got a registered application info: {}", appId);
-    }
-    ApplicationInfoResponse response =
-        
ApplicationInfoResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();
+    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("registerApplicationInfo")) {
+      String appId = request.getAppId();
+      String user = request.getUser();
+      auditContext.setAppId(appId).setArgs("user=" + user);
+      coordinatorServer.getApplicationManager().registerApplicationInfo(appId, 
user);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got a registered application info: {}", appId);
+      }
+      ApplicationInfoResponse response =
+          
ApplicationInfoResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();
+
+      if (Context.current().isCancelled()) {
+        responseObserver.onError(
+            Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
+        auditContext.setStatusCode("CANCELLED");
+        LOG.warn("Cancelled by client {} for after deadline.", appId);
+        return;
+      }
 
-    if (Context.current().isCancelled()) {
-      responseObserver.onError(
-          Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
-      LOG.warn("Cancelled by client {} for after deadline.", appId);
-      return;
+      auditContext.setStatusCode(response.getStatus());
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
     }
-
-    responseObserver.onNext(response);
-    responseObserver.onCompleted();
   }
 
   @Override
   public void accessCluster(
       AccessClusterRequest request, StreamObserver<AccessClusterResponse> 
responseObserver) {
-    StatusCode statusCode = StatusCode.SUCCESS;
-    AccessClusterResponse response;
-    AccessManager accessManager = coordinatorServer.getAccessManager();
-
-    AccessInfo accessInfo =
-        new AccessInfo(
-            request.getAccessId(),
-            Sets.newHashSet(request.getTagsList()),
-            request.getExtraPropertiesMap(),
-            request.getUser());
-    AccessCheckResult result = accessManager.handleAccessRequest(accessInfo);
-    if (!result.isSuccess()) {
-      statusCode = StatusCode.ACCESS_DENIED;
-    }
+    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("accessCluster")) {
+      StatusCode statusCode = StatusCode.SUCCESS;
+      AccessClusterResponse response;
+      AccessManager accessManager = coordinatorServer.getAccessManager();
+
+      AccessInfo accessInfo =
+          new AccessInfo(
+              request.getAccessId(),
+              Sets.newHashSet(request.getTagsList()),
+              request.getExtraPropertiesMap(),
+              request.getUser());
+
+      auditContext.setArgs("accessInfo=" + accessInfo);
+
+      AccessCheckResult result = accessManager.handleAccessRequest(accessInfo);
+      if (!result.isSuccess()) {
+        statusCode = StatusCode.ACCESS_DENIED;
+      }
 
-    response =
-        AccessClusterResponse.newBuilder()
-            .setStatus(statusCode)
-            .setRetMsg(result.getMsg())
-            .setUuid(result.getUuid())
-            .build();
+      response =
+          AccessClusterResponse.newBuilder()
+              .setStatus(statusCode)
+              .setRetMsg(result.getMsg())
+              .setUuid(result.getUuid())
+              .build();
 
-    if (Context.current().isCancelled()) {
-      responseObserver.onError(
-          Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
-      LOG.warn("Cancelled by client {} for after deadline.", accessInfo);
-      return;
-    }
+      if (Context.current().isCancelled()) {
+        responseObserver.onError(
+            Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
+        auditContext.setStatusCode("CANCELLED");
+        LOG.warn("Cancelled by client {} for after deadline.", accessInfo);
+        return;
+      }
 
-    responseObserver.onNext(response);
-    responseObserver.onCompleted();
+      auditContext.setStatusCode(response.getStatus());
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    }
   }
 
   /** To be compatible with the older client version. */
   @Override
   public void fetchClientConf(
       Empty empty, StreamObserver<FetchClientConfResponse> responseObserver) {
-    fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO, 
responseObserver);
+    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("fetchClientConf")) {
+      fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO, 
responseObserver);
+      auditContext.setStatusCode(StatusCode.SUCCESS);
+    }
   }
 
   @Override
   public void fetchClientConfV2(
       FetchClientConfRequest request, StreamObserver<FetchClientConfResponse> 
responseObserver) {
-    fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request), 
responseObserver);
+    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("fetchClientConfV2")) {
+      fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request), 
responseObserver);
+      auditContext.setStatusCode(StatusCode.SUCCESS);
+    }
   }
 
   private void fetchClientConfImpl(
@@ -350,44 +414,50 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   public void fetchRemoteStorage(
       FetchRemoteStorageRequest request,
       StreamObserver<FetchRemoteStorageResponse> responseObserver) {
-    FetchRemoteStorageResponse response;
-    StatusCode status = StatusCode.SUCCESS;
-    String appId = request.getAppId();
-    try {
-      RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder();
-      RemoteStorageInfo rsInfo = 
coordinatorServer.getApplicationManager().pickRemoteStorage(appId);
-      if (rsInfo == null) {
-        LOG.error("Remote storage of {} do not exist.", appId);
-      } else {
-        rsBuilder.setPath(rsInfo.getPath());
-        for (Map.Entry<String, String> entry : 
rsInfo.getConfItems().entrySet()) {
-          rsBuilder.addRemoteStorageConf(
-              RemoteStorageConfItem.newBuilder()
-                  .setKey(entry.getKey())
-                  .setValue(entry.getValue())
-                  .build());
+    try (CoordinatorRPCAuditContext auditContext = 
createAuditContext("fetchRemoteStorage")) {
+      FetchRemoteStorageResponse response;
+      StatusCode status = StatusCode.SUCCESS;
+      String appId = request.getAppId();
+      auditContext.setAppId(appId);
+      try {
+        RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder();
+        RemoteStorageInfo rsInfo =
+            coordinatorServer.getApplicationManager().pickRemoteStorage(appId);
+        if (rsInfo == null) {
+          LOG.error("Remote storage of {} do not exist.", appId);
+        } else {
+          rsBuilder.setPath(rsInfo.getPath());
+          for (Map.Entry<String, String> entry : 
rsInfo.getConfItems().entrySet()) {
+            rsBuilder.addRemoteStorageConf(
+                RemoteStorageConfItem.newBuilder()
+                    .setKey(entry.getKey())
+                    .setValue(entry.getValue())
+                    .build());
+          }
         }
+        response =
+            FetchRemoteStorageResponse.newBuilder()
+                .setStatus(status)
+                .setRemoteStorage(rsBuilder.build())
+                .build();
+      } catch (Exception e) {
+        status = StatusCode.INTERNAL_ERROR;
+        response = 
FetchRemoteStorageResponse.newBuilder().setStatus(status).build();
+        LOG.error("Error happened when get remote storage for appId[{}]", 
appId, e);
       }
-      response =
-          FetchRemoteStorageResponse.newBuilder()
-              .setStatus(status)
-              .setRemoteStorage(rsBuilder.build())
-              .build();
-    } catch (Exception e) {
-      status = StatusCode.INTERNAL_ERROR;
-      response = 
FetchRemoteStorageResponse.newBuilder().setStatus(status).build();
-      LOG.error("Error happened when get remote storage for appId[{}]", appId, 
e);
-    }
 
-    if (Context.current().isCancelled()) {
-      responseObserver.onError(
-          Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
-      LOG.warn("Fetch client conf cancelled by client for after deadline.");
-      return;
-    }
+      if (Context.current().isCancelled()) {
+        responseObserver.onError(
+            Status.CANCELLED.withDescription("Cancelled by 
client").asRuntimeException());
+        auditContext.setStatusCode("CANCELLED");
+        LOG.warn("Fetch client conf cancelled by client for after deadline.");
+        return;
+      }
 
-    responseObserver.onNext(response);
-    responseObserver.onCompleted();
+      auditContext.setStatusCode(response.getStatus());
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    }
   }
 
   private void logAssignmentResult(String appId, int shuffleId, 
PartitionRangeAssignment pra) {
@@ -437,4 +507,23 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
         request.getServerId().getJettyPort(),
         request.getStartTimeMs());
   }
+
+  /**
+   * Creates a {@link CoordinatorRPCAuditContext} instance.
+   *
+   * @param command the command to be logged by this {@link AuditContext}
+   * @return newly-created {@link CoordinatorRPCAuditContext} instance
+   */
+  private CoordinatorRPCAuditContext createAuditContext(String command) {
+    // Audit log may be enabled during runtime
+    Logger auditLogger = null;
+    if (isRpcAuditLogEnabled) {
+      auditLogger = AUDIT_LOGGER;
+    }
+    CoordinatorRPCAuditContext auditContext = new 
CoordinatorRPCAuditContext(auditLogger);
+    if (auditLogger != null) {
+      auditContext.setCommand(command).setCreationTimeNs(System.nanoTime());
+    }
+    return auditContext;
+  }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
new file mode 100644
index 000000000..7a6453bfb
--- /dev/null
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/audit/CoordinatorRPCAuditContext.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.audit;
+
+import org.slf4j.Logger;
+
+import org.apache.uniffle.common.audit.AuditContext;
+import org.apache.uniffle.common.rpc.StatusCode;
+
+/** An audit context for coordinator rpc. */
+public class CoordinatorRPCAuditContext implements AuditContext {
+  private final Logger log;
+  private String command;
+  private String statusCode;
+  private long creationTimeNs;
+  private long executionTimeNs;
+  private String appId = "N/A";
+  private int shuffleId = -1;
+  private String args;
+
+  /**
+   * Constructor of {@link CoordinatorRPCAuditContext}.
+   *
+   * @param log the logger to log the audit information
+   */
+  public CoordinatorRPCAuditContext(Logger log) {
+    this.log = log;
+  }
+
+  /**
+   * Sets mCommand field.
+   *
+   * @param command the command associated with shuffle server rpc
+   * @return this {@link AuditContext} instance
+   */
+  public CoordinatorRPCAuditContext setCommand(String command) {
+    this.command = command;
+    return this;
+  }
+
+  /**
+   * Sets creationTimeNs field.
+   *
+   * @param creationTimeNs the System.nanoTime() when this operation create, 
it only can be used to
+   *     compute operation mExecutionTime
+   * @return this {@link AuditContext} instance
+   */
+  public CoordinatorRPCAuditContext setCreationTimeNs(long creationTimeNs) {
+    this.creationTimeNs = creationTimeNs;
+    return this;
+  }
+
+  /**
+   * Sets status code field.
+   *
+   * @param statusCode the status code
+   * @return this {@link AuditContext} instance
+   */
+  public CoordinatorRPCAuditContext setStatusCode(StatusCode statusCode) {
+    if (statusCode == null) {
+      this.statusCode = "UNKNOWN";
+    } else {
+      this.statusCode = statusCode.name();
+    }
+    return this;
+  }
+
+  /**
+   * Sets status code field.
+   *
+   * @param statusCode the status code
+   * @return this {@link AuditContext} instance
+   */
+  public CoordinatorRPCAuditContext setStatusCode(
+      org.apache.uniffle.proto.RssProtos.StatusCode statusCode) {
+    if (statusCode == null) {
+      this.statusCode = "UNKNOWN";
+    } else {
+      this.statusCode = statusCode.name();
+    }
+    return this;
+  }
+
+  /**
+   * Sets status code field.
+   *
+   * @param statusCode the status code
+   * @return this {@link AuditContext} instance
+   */
+  public CoordinatorRPCAuditContext setStatusCode(String statusCode) {
+    this.statusCode = statusCode;
+    return this;
+  }
+
+  @Override
+  public void close() {
+    if (log == null) {
+      return;
+    }
+    executionTimeNs = System.nanoTime() - creationTimeNs;
+    log.info(toString());
+  }
+
+  @Override
+  public String toString() {
+    String line =
+        String.format(
+            
"cmd=%s\tstatusCode=%s\tappId=%s\tshuffleId=%s\texecutionTimeUs=%d\t",
+            command, statusCode, appId, shuffleId, executionTimeNs / 1000);
+    if (args != null) {
+      line += String.format("args{%s}", args);
+    }
+    return line;
+  }
+
+  public CoordinatorRPCAuditContext setAppId(String appId) {
+    this.appId = appId;
+    return this;
+  }
+
+  public CoordinatorRPCAuditContext setShuffleId(int shuffleId) {
+    this.shuffleId = shuffleId;
+    return this;
+  }
+
+  public CoordinatorRPCAuditContext setArgs(String args) {
+    this.args = args;
+    return this;
+  }
+}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
index 453319320..c0c5aceb5 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
@@ -47,7 +47,10 @@ public class CoordinatorUtils {
       PartitionRangeAssignment pra) {
     List<RssProtos.PartitionRangeAssignment> praList = 
pra.convertToGrpcProto();
 
-    return 
GetShuffleAssignmentsResponse.newBuilder().addAllAssignments(praList).build();
+    return GetShuffleAssignmentsResponse.newBuilder()
+        .addAllAssignments(praList)
+        .setStatus(RssProtos.StatusCode.SUCCESS)
+        .build();
   }
 
   public static int nextIdx(int idx, int size) {
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index e443494c1..464ce2934 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -82,33 +82,34 @@ This document will introduce how to deploy Uniffle 
coordinators.
 ## Configuration
 
 ### Common settings
-|Property Name|Default|        Description                                     
                                                                                
                                                                                
                                                        |
-|---|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-|rss.coordinator.server.heartbeat.timeout|30000| Timeout if can't get 
heartbeat from shuffle server                                                   
                                                                                
                                                                                
    |
-|rss.coordinator.server.periodic.output.interval.times|30| The periodic 
interval times of output alive nodes. The interval sec can be calculated by 
(rss.coordinator.server.heartbeat.timeout/3 * 
rss.coordinator.server.periodic.output.interval.times). Default output interval 
is 5min.                                          |
-|rss.coordinator.assignment.strategy|PARTITION_BALANCE| Strategy for assigning 
shuffle server, PARTITION_BALANCE should be used for workload balance           
                                                                                
                                                                                
  |
-|rss.coordinator.app.expired|60000| Application expired time (ms), the 
heartbeat interval should be less than it                                       
                                                                                
                                                                      |
-|rss.coordinator.shuffle.nodes.max|9| The max number of shuffle server when do 
the assignment                                                                  
                                                                                
                                                                |
-|rss.coordinator.dynamicClientConf.path|-| The path of configuration file 
which have default conf for rss client                                          
                                                                                
                                                                          |
-|rss.coordinator.exclude.nodes.file.path|-| The path of configuration file 
which have exclude nodes                                                        
                                                                                
                                                                          |
-|rss.coordinator.exclude.nodes.check.interval.ms|60000| Update interval (ms) 
for exclude nodes                                                               
                                                                                
                                                                                
    |
-|rss.coordinator.access.checkers|org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker|
 The access checkers will be used when the spark client use the 
DelegationShuffleManager, which will decide whether to use rss according to the 
result of the specified access checkers                                         
                                          |
-|rss.coordinator.access.loadChecker.memory.percentage|15.0| The minimal 
percentage of available memory percentage of a server                           
                                                                                
                                                                                
             |
-|rss.coordinator.dynamicClientConf.enabled|false| whether to enable dynamic 
client conf, which will be fetched by spark client                              
                                                                                
                                                                               |
-|rss.coordinator.dynamicClientConf.path|-| The dynamic client conf of this 
cluster and can be stored in HADOOP FS or local                                 
                                                                                
                                                                         |
-|rss.coordinator.dynamicClientConf.updateIntervalSec|120| The dynamic client 
conf update interval in seconds                                                 
                                                                                
                                                                                
      |
-|rss.coordinator.remote.storage.cluster.conf|-| Remote Storage Cluster related 
conf with format $clusterId,$key=$value, separated by ';'                       
                                                                                
                                                                          |
-|rss.rpc.server.port|-| RPC port for coordinator                               
                                                                                
                                                                                
                                                  |
-|rss.jetty.http.port|-| Http port for coordinator                              
                                                                                
                                                                                
                                                  |
-|rss.coordinator.remote.storage.select.strategy|APP_BALANCE| Strategy for 
selecting the remote path                                                       
                                                                                
                                                                                
            |
-|rss.coordinator.remote.storage.io.sample.schedule.time|60000| The time of 
scheduling the read and write time of the paths to obtain different HADOOP FS   
                                                                                
                                                                                
             |
-|rss.coordinator.remote.storage.io.sample.file.size|204800000| The size of the 
file that the scheduled thread reads and writes                                 
                                                                                
                                                                                
         |
-|rss.coordinator.remote.storage.io.sample.access.times|3| The number of times 
to read and write HADOOP FS files                                               
                                                                                
                                                                                
     |
-|rss.coordinator.startup-silent-period.enabled|false| Enable the 
startup-silent-period to reject the assignment requests for avoiding partial 
assignments. To avoid service interruption, this mechanism is disabled by 
default. Especially it's recommended to use in coordinator HA mode when 
restarting single coordinator. |
-|rss.coordinator.startup-silent-period.duration|20000| The waiting 
duration(ms) when conf of rss.coordinator.startup-silent-period.enabled is 
enabled.                                                                        
                                                                                
                  |
-|rss.coordinator.select.partition.strategy|CONTINUOUS| There are two 
strategies for selecting partitions: ROUND and CONTINUOUS. ROUND will poll to 
allocate partitions to ShuffleServer, and CONTINUOUS will try to allocate 
consecutive partitions to ShuffleServer, this feature can improve performance 
in AQE scenarios.    |
-|rss.metrics.reporter.class|-| The class of metrics reporter.                  
                                                                                
                                                                                
                                                         |
-|rss.reconfigure.interval.sec|5| Reconfigure check interval.                   
                                                                                
                                                                                
                                                           |
+| Property Name                                          | Default             
                                                   |    Description             
                                                                                
                                                                                
                                                                                
|
+|--------------------------------------------------------|------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| rss.coordinator.server.heartbeat.timeout               | 30000               
                                                   | Timeout if can't get 
heartbeat from shuffle server                                                   
                                                                                
                                                                                
    |
+| rss.coordinator.server.periodic.output.interval.times  | 30                  
                                                   | The periodic interval 
times of output alive nodes. The interval sec can be calculated by 
(rss.coordinator.server.heartbeat.timeout/3 * 
rss.coordinator.server.periodic.output.interval.times). Default output interval 
is 5min.                                          |
+| rss.coordinator.assignment.strategy                    | PARTITION_BALANCE   
                                                   | Strategy for assigning 
shuffle server, PARTITION_BALANCE should be used for workload balance           
                                                                                
                                                                                
  |
+| rss.coordinator.app.expired                            | 60000               
                                                   | Application expired time 
(ms), the heartbeat interval should be less than it                             
                                                                                
                                                                                
|
+| rss.coordinator.shuffle.nodes.max                      | 9                   
                                                   | The max number of shuffle 
server when do the assignment                                                   
                                                                                
                                                                               |
+| rss.coordinator.dynamicClientConf.path                 | -                   
                                                   | The path of configuration 
file which have default conf for rss client                                     
                                                                                
                                                                               |
+| rss.coordinator.exclude.nodes.file.path                | -                   
                                                   | The path of configuration 
file which have exclude nodes                                                   
                                                                                
                                                                               |
+| rss.coordinator.exclude.nodes.check.interval.ms        | 60000               
                                                   | Update interval (ms) for 
exclude nodes                                                                   
                                                                                
                                                                                
|
+| rss.coordinator.access.checkers                        | 
org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker | The 
access checkers will be used when the spark client use the 
DelegationShuffleManager, which will decide whether to use rss according to the 
result of the specified access checkers                                         
                                          |
+| rss.coordinator.access.loadChecker.memory.percentage   | 15.0                
                                                   | The minimal percentage of 
available memory percentage of a server                                         
                                                                                
                                                                               |
+| rss.coordinator.dynamicClientConf.enabled              | false               
                                                   | whether to enable dynamic 
client conf, which will be fetched by spark client                              
                                                                                
                                                                               |
+| rss.coordinator.dynamicClientConf.path                 | -                   
                                                   | The dynamic client conf of 
this cluster and can be stored in HADOOP FS or local                            
                                                                                
                                                                              |
+| rss.coordinator.dynamicClientConf.updateIntervalSec    | 120                 
                                                   | The dynamic client conf 
update interval in seconds                                                      
                                                                                
                                                                                
 |
+| rss.coordinator.remote.storage.cluster.conf            | -                   
                                                   | Remote Storage Cluster 
related conf with format $clusterId,$key=$value, separated by ';'               
                                                                                
                                                                                
  |
+| rss.rpc.server.port                                    | -                   
                                                   | RPC port for coordinator   
                                                                                
                                                                                
                                                                              |
+| rss.jetty.http.port                                    | -                   
                                                   | Http port for coordinator  
                                                                                
                                                                                
                                                                              |
+| rss.coordinator.remote.storage.select.strategy         | APP_BALANCE         
                                                   | Strategy for selecting the 
remote path                                                                     
                                                                                
                                                                              |
+| rss.coordinator.remote.storage.io.sample.schedule.time | 60000               
                                                   | The time of scheduling the 
read and write time of the paths to obtain different HADOOP FS                  
                                                                                
                                                                              |
+| rss.coordinator.remote.storage.io.sample.file.size     | 204800000           
                                                   | The size of the file that 
the scheduled thread reads and writes                                           
                                                                                
                                                                               |
+| rss.coordinator.remote.storage.io.sample.access.times  | 3                   
                                                   | The number of times to 
read and write HADOOP FS files                                                  
                                                                                
                                                                                
  |
+| rss.coordinator.startup-silent-period.enabled          | false               
                                                   | Enable the 
startup-silent-period to reject the assignment requests for avoiding partial 
assignments. To avoid service interruption, this mechanism is disabled by 
default. Especially it's recommended to use in coordinator HA mode when 
restarting single coordinator. |
+| rss.coordinator.startup-silent-period.duration         | 20000               
                                                   | The waiting duration(ms) 
when conf of rss.coordinator.startup-silent-period.enabled is enabled.          
                                                                                
                                                                                
|
+| rss.coordinator.select.partition.strategy              | CONTINUOUS          
                                                   | There are two strategies 
for selecting partitions: ROUND and CONTINUOUS. ROUND will poll to allocate 
partitions to ShuffleServer, and CONTINUOUS will try to allocate consecutive 
partitions to ShuffleServer, this feature can improve performance in AQE 
scenarios.    |
+| rss.metrics.reporter.class                             | -                   
                                                   | The class of metrics 
reporter.                                                                       
                                                                                
                                                                                
    |
+| rss.reconfigure.interval.sec                           | 5                   
                                                   | Reconfigure check 
interval.                                                                       
                                                                                
                                                                                
       |
+| rss.coordinator.rpc.audit.log.enabled                  | true                
                                                   | When set to true, for 
auditing purposes, the coordinator will log audit records for every rpc request 
operation.                                                                      
                                                                                
   |
 
 ### AccessClusterLoadChecker settings
 |Property Name|Default|        Description|

Reply via email to