This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 88b88fffb1 HDDS-11324. Negative value preOpLatencyMs in DN audit log
(#7093)
88b88fffb1 is described below
commit 88b88fffb1816db0c686060aa786c71ce7098138
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Aug 20 01:43:52 2024 +0800
HDDS-11324. Negative value preOpLatencyMs in DN audit log (#7093)
---
.../container/common/impl/HddsDispatcher.java | 65 +++++++++++++---------
.../org/apache/hadoop/ozone/audit/AuditLogger.java | 10 +++-
2 files changed, 47 insertions(+), 28 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index cac4df73cc..c5855b38b7 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -96,7 +96,9 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
private static final String AUDIT_PARAM_FORCE_DELETE = "forceDelete";
private static final String AUDIT_PARAM_START_CONTAINER_ID =
"startContainerID";
private static final String AUDIT_PARAM_BLOCK_DATA = "blockData";
- private static final String AUDIT_PARAM_BLOCK_DATA_SIZE = "blockDataSize";
+ private static final String AUDIT_PARAM_BLOCK_DATA_OFFSET = "offset";
+ private static final String AUDIT_PARAM_BLOCK_DATA_SIZE = "size";
+ private static final String AUDIT_PARAM_BLOCK_DATA_STAGE = "stage";
private static final String AUDIT_PARAM_COUNT = "count";
private static final String AUDIT_PARAM_START_LOCAL_ID = "startLocalID";
private static final String AUDIT_PARAM_PREV_CHUNKNAME = "prevChunkName";
@@ -112,7 +114,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
private String clusterId;
private ContainerMetrics metrics;
private final TokenVerifier tokenVerifier;
- private long slowOpThresholdMs;
+ private long slowOpThresholdNs;
private VolumeUsage.MinFreeSpaceCalculator freeSpaceCalculator;
/**
@@ -134,7 +136,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
this.tokenVerifier = tokenVerifier != null ? tokenVerifier
: new NoopTokenVerifier();
- this.slowOpThresholdMs = getSlowOpThresholdMs(conf);
+ this.slowOpThresholdNs = getSlowOpThresholdMs(conf) * 1000000;
protocolMetrics =
new ProtocolMessageMetrics<>(
@@ -279,7 +281,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
"ContainerID " + containerID
+ " has been lost and cannot be recreated on this DataNode",
ContainerProtos.Result.CONTAINER_MISSING);
- audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
@@ -306,7 +308,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " creation failed",
responseProto.getResult());
- audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
@@ -325,13 +327,13 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " does not exist",
ContainerProtos.Result.CONTAINER_NOT_FOUND);
- audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
containerType = getContainerType(container);
} else {
if (!msg.hasCreateContainer()) {
- audit(action, eventType, msg, AuditEventStatus.FAILURE,
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.FAILURE,
new Exception("MALFORMED_REQUEST"));
return malformedRequest(msg);
}
@@ -348,10 +350,10 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
"ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
// log failure
- audit(action, eventType, msg, AuditEventStatus.FAILURE, ex);
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.FAILURE, ex);
return ContainerUtils.logAndReturnError(LOG, ex, msg);
}
- perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
+ perf.appendPreOpLatencyNano(Time.monotonicNowNanos() - startTime);
responseProto = handler.handle(msg, container, dispatcherContext);
long opLatencyNs = Time.monotonicNowNanos() - startTime;
if (responseProto != null) {
@@ -417,7 +419,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
}
if (result == Result.SUCCESS) {
updateBCSID(container, dispatcherContext, cmdType);
- audit(action, eventType, msg, AuditEventStatus.SUCCESS, null);
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.SUCCESS, null);
} else {
//TODO HDDS-7096:
// This is a too general place for on demand scanning.
@@ -425,16 +427,16 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
// and move this general scan to where it is more appropriate.
// Add integration tests to test the full functionality.
OnDemandContainerDataScanner.scanContainer(container);
- audit(action, eventType, msg, AuditEventStatus.FAILURE,
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
- perf.appendOpLatencyMs(opLatencyNs);
- performanceAudit(action, msg, perf, opLatencyNs);
+ perf.appendOpLatencyNanos(opLatencyNs);
+ performanceAudit(action, msg, dispatcherContext, perf, opLatencyNs);
return responseProto;
} else {
// log failure
- audit(action, eventType, msg, AuditEventStatus.FAILURE,
+ audit(action, eventType, msg, dispatcherContext,
AuditEventStatus.FAILURE,
new Exception("UNSUPPORTED_REQUEST"));
return unsupportedRequest(msg);
}
@@ -547,7 +549,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
StorageContainerException ex = new StorageContainerException(
"Invalid ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
- audit(action, eventType, msg, AuditEventStatus.FAILURE, ex);
+ audit(action, eventType, msg, null, AuditEventStatus.FAILURE, ex);
throw ex;
}
@@ -567,12 +569,12 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
// if the container is not open/recovering, no updates can happen. Just
// throw an exception
ContainerNotOpenException cex = new ContainerNotOpenException(log);
- audit(action, eventType, msg, AuditEventStatus.FAILURE, cex);
+ audit(action, eventType, msg, null, AuditEventStatus.FAILURE, cex);
throw cex;
}
} else if (HddsUtils.isReadOnly(msg) && containerState == State.INVALID) {
InvalidContainerStateException iex = new
InvalidContainerStateException(log);
- audit(action, eventType, msg, AuditEventStatus.FAILURE, iex);
+ audit(action, eventType, msg, null, AuditEventStatus.FAILURE, iex);
throw iex;
}
}
@@ -678,14 +680,14 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
}
private void audit(AuditAction action, EventType eventType,
- ContainerCommandRequestProto msg, AuditEventStatus result,
- Throwable exception) {
+ ContainerCommandRequestProto msg, DispatcherContext dispatcherContext,
+ AuditEventStatus result, Throwable exception) {
Map<String, String> params;
AuditMessage amsg;
switch (result) {
case SUCCESS:
if (isAllowed(action.getAction())) {
- params = getAuditParams(msg);
+ params = getAuditParams(msg, dispatcherContext);
if (eventType == EventType.READ &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
@@ -699,7 +701,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
break;
case FAILURE:
- params = getAuditParams(msg);
+ params = getAuditParams(msg, dispatcherContext);
if (eventType == EventType.READ &&
AUDIT.getLogger().isErrorEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForFailure(action, params, exception);
@@ -719,9 +721,9 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
}
private void performanceAudit(AuditAction action,
ContainerCommandRequestProto msg,
- PerformanceStringBuilder performance, long opLatencyMs) {
- if (isOperationSlow(opLatencyMs)) {
- Map<String, String> params = getAuditParams(msg);
+ DispatcherContext dispatcherContext, PerformanceStringBuilder
performance, long opLatencyNs) {
+ if (isOperationSlow(opLatencyNs)) {
+ Map<String, String> params = getAuditParams(msg, dispatcherContext);
AuditMessage auditMessage =
buildAuditMessageForPerformance(action, params, performance);
AUDIT.logPerformance(auditMessage);
@@ -837,7 +839,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
}
private static Map<String, String> getAuditParams(
- ContainerCommandRequestProto msg) {
+ ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
Map<String, String> auditParams = new TreeMap<>();
Type cmdType = msg.getCmdType();
String containerID = String.valueOf(msg.getContainerID());
@@ -904,6 +906,8 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
case ReadChunk:
auditParams.put(AUDIT_PARAM_BLOCK_DATA,
BlockID.getFromProtobuf(msg.getReadChunk().getBlockID()).toString());
+ auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
+ String.valueOf(msg.getReadChunk().getChunkData().getOffset()));
auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
String.valueOf(msg.getReadChunk().getChunkData().getLen()));
return auditParams;
@@ -918,8 +922,13 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
auditParams.put(AUDIT_PARAM_BLOCK_DATA,
BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID())
.toString());
+ auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
+ String.valueOf(msg.getWriteChunk().getChunkData().getOffset()));
auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
String.valueOf(msg.getWriteChunk().getChunkData().getLen()));
+ if (dispatcherContext != null && dispatcherContext.getStage() != null) {
+ auditParams.put(AUDIT_PARAM_BLOCK_DATA_STAGE,
dispatcherContext.getStage().toString());
+ }
return auditParams;
case ListChunk:
@@ -936,6 +945,8 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
auditParams.put(AUDIT_PARAM_BLOCK_DATA,
BlockData.getFromProtoBuf(msg.getPutSmallFile()
.getBlock().getBlockData()).toString());
+ auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
+ String.valueOf(msg.getPutSmallFile().getChunkInfo().getOffset()));
auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
String.valueOf(msg.getPutSmallFile().getChunkInfo().getLen()));
} catch (IOException ex) {
@@ -975,7 +986,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
}
- private boolean isOperationSlow(long opLatencyMs) {
- return opLatencyMs >= slowOpThresholdMs;
+ private boolean isOperationSlow(long opLatencyNs) {
+ return opLatencyNs >= slowOpThresholdNs;
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
index 042887e4e5..5e8996df17 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
@@ -168,12 +168,20 @@ public class AuditLogger {
/**
* Appends pre-operation operation latency in milliseconds.
- * @param millis Latency in nanoseconds.
+ * @param millis Latency in milliseconds.
*/
public void appendPreOpLatencyMs(long millis) {
append("preOpLatencyMs", millis);
}
+ /**
+ * Appends pre-operation operation latency in milliseconds.
+ * @param nanos Latency in nanoseconds.
+ */
+ public void appendPreOpLatencyNano(long nanos) {
+ append("preOpLatencyMs", TimeUnit.NANOSECONDS.toMillis(nanos));
+ }
+
/**
* Appends whole operation latency in milliseconds.
* @param millis Latency in milliseconds.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]