This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 88b88fffb1 HDDS-11324. Negative value preOpLatencyMs in DN audit log 
(#7093)
88b88fffb1 is described below

commit 88b88fffb1816db0c686060aa786c71ce7098138
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Aug 20 01:43:52 2024 +0800

    HDDS-11324. Negative value preOpLatencyMs in DN audit log (#7093)
---
 .../container/common/impl/HddsDispatcher.java      | 65 +++++++++++++---------
 .../org/apache/hadoop/ozone/audit/AuditLogger.java | 10 +++-
 2 files changed, 47 insertions(+), 28 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index cac4df73cc..c5855b38b7 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -96,7 +96,9 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
   private static final String AUDIT_PARAM_FORCE_DELETE = "forceDelete";
   private static final String AUDIT_PARAM_START_CONTAINER_ID = 
"startContainerID";
   private static final String AUDIT_PARAM_BLOCK_DATA = "blockData";
-  private static final String AUDIT_PARAM_BLOCK_DATA_SIZE = "blockDataSize";
+  private static final String AUDIT_PARAM_BLOCK_DATA_OFFSET = "offset";
+  private static final String AUDIT_PARAM_BLOCK_DATA_SIZE = "size";
+  private static final String AUDIT_PARAM_BLOCK_DATA_STAGE = "stage";
   private static final String AUDIT_PARAM_COUNT = "count";
   private static final String AUDIT_PARAM_START_LOCAL_ID = "startLocalID";
   private static final String AUDIT_PARAM_PREV_CHUNKNAME = "prevChunkName";
@@ -112,7 +114,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
   private String clusterId;
   private ContainerMetrics metrics;
   private final TokenVerifier tokenVerifier;
-  private long slowOpThresholdMs;
+  private long slowOpThresholdNs;
   private VolumeUsage.MinFreeSpaceCalculator freeSpaceCalculator;
 
   /**
@@ -134,7 +136,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
         HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
     this.tokenVerifier = tokenVerifier != null ? tokenVerifier
         : new NoopTokenVerifier();
-    this.slowOpThresholdMs = getSlowOpThresholdMs(conf);
+    this.slowOpThresholdNs = getSlowOpThresholdMs(conf) * 1000000;
 
     protocolMetrics =
         new ProtocolMessageMetrics<>(
@@ -279,7 +281,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
           "ContainerID " + containerID
               + " has been lost and cannot be recreated on this DataNode",
           ContainerProtos.Result.CONTAINER_MISSING);
-      audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
+      audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.FAILURE, sce);
       return ContainerUtils.logAndReturnError(LOG, sce, msg);
     }
 
@@ -306,7 +308,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
           StorageContainerException sce = new StorageContainerException(
               "ContainerID " + containerID + " creation failed",
               responseProto.getResult());
-          audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
+          audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.FAILURE, sce);
           return ContainerUtils.logAndReturnError(LOG, sce, msg);
         }
         Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
@@ -325,13 +327,13 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
         StorageContainerException sce = new StorageContainerException(
             "ContainerID " + containerID + " does not exist",
             ContainerProtos.Result.CONTAINER_NOT_FOUND);
-        audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
+        audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.FAILURE, sce);
         return ContainerUtils.logAndReturnError(LOG, sce, msg);
       }
       containerType = getContainerType(container);
     } else {
       if (!msg.hasCreateContainer()) {
-        audit(action, eventType, msg, AuditEventStatus.FAILURE,
+        audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.FAILURE,
             new Exception("MALFORMED_REQUEST"));
         return malformedRequest(msg);
       }
@@ -348,10 +350,10 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
           "ContainerType " + containerType,
           ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
       // log failure
-      audit(action, eventType, msg, AuditEventStatus.FAILURE, ex);
+      audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.FAILURE, ex);
       return ContainerUtils.logAndReturnError(LOG, ex, msg);
     }
-    perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
+    perf.appendPreOpLatencyNano(Time.monotonicNowNanos() - startTime);
     responseProto = handler.handle(msg, container, dispatcherContext);
     long opLatencyNs = Time.monotonicNowNanos() - startTime;
     if (responseProto != null) {
@@ -417,7 +419,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
       }
       if (result == Result.SUCCESS) {
         updateBCSID(container, dispatcherContext, cmdType);
-        audit(action, eventType, msg, AuditEventStatus.SUCCESS, null);
+        audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.SUCCESS, null);
       } else {
         //TODO HDDS-7096:
         // This is a too general place for on demand scanning.
@@ -425,16 +427,16 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
         // and move this general scan to where it is more appropriate.
         // Add integration tests to test the full functionality.
         OnDemandContainerDataScanner.scanContainer(container);
-        audit(action, eventType, msg, AuditEventStatus.FAILURE,
+        audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.FAILURE,
             new Exception(responseProto.getMessage()));
       }
-      perf.appendOpLatencyMs(opLatencyNs);
-      performanceAudit(action, msg, perf, opLatencyNs);
+      perf.appendOpLatencyNanos(opLatencyNs);
+      performanceAudit(action, msg, dispatcherContext, perf, opLatencyNs);
 
       return responseProto;
     } else {
       // log failure
-      audit(action, eventType, msg, AuditEventStatus.FAILURE,
+      audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.FAILURE,
           new Exception("UNSUPPORTED_REQUEST"));
       return unsupportedRequest(msg);
     }
@@ -547,7 +549,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
       StorageContainerException ex = new StorageContainerException(
           "Invalid ContainerType " + containerType,
           ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
-      audit(action, eventType, msg, AuditEventStatus.FAILURE, ex);
+      audit(action, eventType, msg, null, AuditEventStatus.FAILURE, ex);
       throw ex;
     }
 
@@ -567,12 +569,12 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
         // if the container is not open/recovering, no updates can happen. Just
         // throw an exception
         ContainerNotOpenException cex = new ContainerNotOpenException(log);
-        audit(action, eventType, msg, AuditEventStatus.FAILURE, cex);
+        audit(action, eventType, msg, null, AuditEventStatus.FAILURE, cex);
         throw cex;
       }
     } else if (HddsUtils.isReadOnly(msg) && containerState == State.INVALID) {
       InvalidContainerStateException iex = new 
InvalidContainerStateException(log);
-      audit(action, eventType, msg, AuditEventStatus.FAILURE, iex);
+      audit(action, eventType, msg, null, AuditEventStatus.FAILURE, iex);
       throw iex;
     }
   }
@@ -678,14 +680,14 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
   }
 
   private void audit(AuditAction action, EventType eventType,
-      ContainerCommandRequestProto msg, AuditEventStatus result,
-      Throwable exception) {
+      ContainerCommandRequestProto msg, DispatcherContext dispatcherContext,
+      AuditEventStatus result, Throwable exception) {
     Map<String, String> params;
     AuditMessage amsg;
     switch (result) {
     case SUCCESS:
       if (isAllowed(action.getAction())) {
-        params = getAuditParams(msg);
+        params = getAuditParams(msg, dispatcherContext);
         if (eventType == EventType.READ &&
             AUDIT.getLogger().isInfoEnabled(AuditMarker.READ.getMarker())) {
           amsg = buildAuditMessageForSuccess(action, params);
@@ -699,7 +701,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
       break;
 
     case FAILURE:
-      params = getAuditParams(msg);
+      params = getAuditParams(msg, dispatcherContext);
       if (eventType == EventType.READ &&
           AUDIT.getLogger().isErrorEnabled(AuditMarker.READ.getMarker())) {
         amsg = buildAuditMessageForFailure(action, params, exception);
@@ -719,9 +721,9 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
   }
 
   private void performanceAudit(AuditAction action, 
ContainerCommandRequestProto msg,
-      PerformanceStringBuilder performance, long opLatencyMs) {
-    if (isOperationSlow(opLatencyMs)) {
-      Map<String, String> params = getAuditParams(msg);
+      DispatcherContext dispatcherContext, PerformanceStringBuilder 
performance, long opLatencyNs) {
+    if (isOperationSlow(opLatencyNs)) {
+      Map<String, String> params = getAuditParams(msg, dispatcherContext);
       AuditMessage auditMessage =
           buildAuditMessageForPerformance(action, params, performance);
       AUDIT.logPerformance(auditMessage);
@@ -837,7 +839,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
   }
 
   private static Map<String, String> getAuditParams(
-      ContainerCommandRequestProto msg) {
+      ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
     Map<String, String> auditParams = new TreeMap<>();
     Type cmdType = msg.getCmdType();
     String containerID = String.valueOf(msg.getContainerID());
@@ -904,6 +906,8 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
     case ReadChunk:
       auditParams.put(AUDIT_PARAM_BLOCK_DATA,
           BlockID.getFromProtobuf(msg.getReadChunk().getBlockID()).toString());
+      auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
+          String.valueOf(msg.getReadChunk().getChunkData().getOffset()));
       auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
           String.valueOf(msg.getReadChunk().getChunkData().getLen()));
       return auditParams;
@@ -918,8 +922,13 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
       auditParams.put(AUDIT_PARAM_BLOCK_DATA,
           BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID())
               .toString());
+      auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
+          String.valueOf(msg.getWriteChunk().getChunkData().getOffset()));
       auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
           String.valueOf(msg.getWriteChunk().getChunkData().getLen()));
+      if (dispatcherContext != null && dispatcherContext.getStage() != null) {
+        auditParams.put(AUDIT_PARAM_BLOCK_DATA_STAGE, 
dispatcherContext.getStage().toString());
+      }
       return auditParams;
 
     case ListChunk:
@@ -936,6 +945,8 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
         auditParams.put(AUDIT_PARAM_BLOCK_DATA,
             BlockData.getFromProtoBuf(msg.getPutSmallFile()
                 .getBlock().getBlockData()).toString());
+        auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
+            String.valueOf(msg.getPutSmallFile().getChunkInfo().getOffset()));
         auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
             String.valueOf(msg.getPutSmallFile().getChunkInfo().getLen()));
       } catch (IOException ex) {
@@ -975,7 +986,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
 
   }
 
-  private boolean isOperationSlow(long opLatencyMs) {
-    return opLatencyMs >= slowOpThresholdMs;
+  private boolean isOperationSlow(long opLatencyNs) {
+    return opLatencyNs >= slowOpThresholdNs;
   }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
index 042887e4e5..5e8996df17 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
@@ -168,12 +168,20 @@ public class AuditLogger {
 
     /**
      * Appends pre-operation operation latency in milliseconds.
-     * @param millis Latency in nanoseconds.
+     * @param millis Latency in milliseconds.
      */
     public void appendPreOpLatencyMs(long millis) {
       append("preOpLatencyMs", millis);
     }
 
+    /**
+     * Appends pre-operation operation latency in milliseconds.
+     * @param nanos Latency in nanoseconds.
+     */
+    public void appendPreOpLatencyNano(long nanos) {
+      append("preOpLatencyMs", TimeUnit.NANOSECONDS.toMillis(nanos));
+    }
+
     /**
      * Appends whole operation latency in milliseconds.
      * @param millis Latency in milliseconds.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to