This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b59366dcea HDDS-9718. Add performance audit logging for S3G (#5644)
b59366dcea is described below
commit b59366dcead37b3c0ed96255652a7b7471dc0971
Author: XiChen <[email protected]>
AuthorDate: Tue Nov 28 18:39:24 2023 +0800
HDDS-9718. Add performance audit logging for S3G (#5644)
---
.../org/apache/hadoop/ozone/audit/AuditLogger.java | 72 ++++++++++++
.../apache/hadoop/ozone/audit/AuditMessage.java | 22 +++-
.../org/apache/hadoop/ozone/audit/S3GAction.java | 1 +
.../hadoop/ozone/s3/endpoint/BucketEndpoint.java | 12 +-
.../hadoop/ozone/s3/endpoint/EndpointBase.java | 8 ++
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 122 +++++++++++++--------
.../ozone/s3/endpoint/ObjectEndpointStreaming.java | 32 ++++--
.../hadoop/ozone/s3/metrics/S3GatewayMetrics.java | 43 ++++----
8 files changed, 233 insertions(+), 79 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
index b27fc78771..f4f8ba7853 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
@@ -30,6 +30,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -132,4 +133,75 @@ public class AuditLogger {
return debugCmdSetRef.get()
.contains(auditMessage.getOp().toLowerCase(Locale.ROOT));
}
+
+ /**
+ * Utility class for building performance log strings.
+ */
+ public static class PerformanceStringBuilder {
+ private final StringBuilder builder = new StringBuilder(128).append('{');
+ /**
+ * Appends metadata operation latency in milliseconds.
+ * @param nanos Latency in nanoseconds.
+ */
+ public void appendMetaLatencyNanos(long nanos) {
+ append("metaLatencyMs", TimeUnit.NANOSECONDS.toMillis(nanos));
+ }
+
+ /**
+ * Appends whole operation latency in milliseconds.
+ * @param nanos Latency in nanoseconds.
+ */
+ public void appendOpLatencyNanos(long nanos) {
+ append("opLatencyMs", TimeUnit.NANOSECONDS.toMillis(nanos));
+ }
+
+ /**
+ * Appends the size in bytes.
+ * @param bytes Size in bytes.
+ */
+ public void appendSizeBytes(long bytes) {
+ append("sizeByte", bytes);
+ }
+
+ /**
+ * Appends the count.
+ * @param count The count value to be appended.
+ */
+ public void appendCount(long count) {
+ append("count", count);
+ }
+
+ /**
+ * Appends a stream mode flag.
+ */
+ public void appendStreamMode() {
+ append("streamMode", "true");
+ }
+
+ private void append(String name, long value) {
+ append(name, String.valueOf(value));
+ }
+
+ /**
+ * Appends a name-value pair to the log string.
+ * @param name Name of the metric.
+ * @param value Value of the metric.
+ */
+ private void append(String name, String value) {
+ builder.append(name)
+ .append('=')
+ .append(value)
+ .append(", ");
+ }
+
+ public String build() {
+ final int length = builder.length();
+ if (length < 2) {
+ return "{}";
+ }
+ builder.setCharAt(length - 2, '}');
+ builder.setLength(length - 1);
+ return builder.toString();
+ }
+ }
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java
index 85fa7986b9..bff05f024d 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java
@@ -34,13 +34,14 @@ public final class AuditMessage implements Message {
private final Throwable throwable;
private AuditMessage(String user, String ip, String op,
- Map<String, String> params, String ret, Throwable throwable) {
+ Map<String, String> params, String ret, Throwable throwable,
+ String performance) {
this.user = user;
this.ip = ip;
this.op = op;
this.params = params;
this.ret = ret;
- this.message = formMessage(user, ip, op, params, ret);
+ this.message = formMessage(user, ip, op, params, ret, performance);
this.throwable = throwable;
}
@@ -78,6 +79,7 @@ public final class AuditMessage implements Message {
private String op;
private Map<String, String> params;
private String ret;
+ private String performance;
public Builder setUser(String usr) {
this.user = usr;
@@ -109,15 +111,23 @@ public final class AuditMessage implements Message {
return this;
}
+ public Builder setPerformance(String perf) {
+ this.performance = perf;
+ return this;
+ }
+
public AuditMessage build() {
- return new AuditMessage(user, ip, op, params, ret, throwable);
+ return new AuditMessage(user, ip, op, params, ret, throwable,
+ performance);
}
}
private String formMessage(String userStr, String ipStr, String opStr,
- Map<String, String> paramsMap, String retStr) {
+ Map<String, String> paramsMap, String retStr,
+ String performanceMap) {
+ String perf = performanceMap != null && !performanceMap.isEmpty()
+ ? " | perf=" + performanceMap : "";
return "user=" + userStr + " | ip=" + ipStr + " | " + "op=" + opStr
- + " " + paramsMap + " | " + "ret=" + retStr;
-
+ + " " + paramsMap + " | ret=" + retStr + perf;
}
}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/audit/S3GAction.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/audit/S3GAction.java
index dcf116ea0f..20c2f4c627 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/audit/S3GAction.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/audit/S3GAction.java
@@ -36,6 +36,7 @@ public enum S3GAction implements AuditAction {
//ObjectEndpoint
CREATE_MULTIPART_KEY,
+ CREATE_MULTIPART_KEY_BY_COPY,
COPY_OBJECT,
CREATE_KEY,
LIST_PARTS,
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java
index 6ab3a4ba7f..910b0026e3 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java
@@ -69,6 +69,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import static
org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder;
import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_LIST_KEYS_SHALLOW_ENABLED;
@@ -113,6 +114,8 @@ public class BucketEndpoint extends EndpointBase {
@Context HttpHeaders hh) throws OS3Exception, IOException {
long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.GET_BUCKET;
+ PerformanceStringBuilder perf = new PerformanceStringBuilder();
+
Iterator<? extends OzoneKey> ozoneKeyIterator;
ContinueToken decodedToken =
ContinueToken.decodeFromString(continueToken);
@@ -264,12 +267,15 @@ public class BucketEndpoint extends EndpointBase {
response.setTruncated(false);
}
- AUDIT.logReadSuccess(buildAuditMessageForSuccess(s3GAction,
- getAuditParameters()));
int keyCount =
response.getCommonPrefixes().size() + response.getContents().size();
- getMetrics().updateGetBucketSuccessStats(startNanos);
+ long opLatencyNs =
+ getMetrics().updateGetBucketSuccessStats(startNanos);
getMetrics().incListKeyCount(keyCount);
+ perf.appendCount(keyCount);
+ perf.appendOpLatencyNanos(opLatencyNs);
+ AUDIT.logReadSuccess(buildAuditMessageForSuccess(s3GAction,
+ getAuditParameters(), perf.build()));
response.setKeyCount(keyCount);
return Response.ok(response).build();
}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index 05b7a62c06..abda4678dc 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -353,6 +353,14 @@ public abstract class EndpointBase implements Auditor {
return builder.build();
}
+ public AuditMessage buildAuditMessageForSuccess(AuditAction op,
+ Map<String, String> auditMap, String performance) {
+ AuditMessage.Builder builder = auditMessageBaseBuilder(op, auditMap)
+ .withResult(AuditEventStatus.SUCCESS);
+ builder.setPerformance(performance);
+ return builder.build();
+ }
+
@Override
public AuditMessage buildAuditMessageForFailure(AuditAction op,
Map<String, String> auditMap, Throwable throwable) {
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index ad4bbebeeb..b607b1c5cf 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -112,6 +112,7 @@ import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_T
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT;
+import static
org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
@@ -218,16 +219,21 @@ public class ObjectEndpoint extends EndpointBase {
InputStream body) throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.CREATE_KEY;
-
boolean auditSuccess = true;
+ PerformanceStringBuilder perf = new PerformanceStringBuilder();
+
String copyHeader = null, storageType = null;
try {
OzoneVolume volume = getVolume();
if (uploadID != null && !uploadID.equals("")) {
- s3GAction = S3GAction.CREATE_MULTIPART_KEY;
+ if (headers.getHeaderString(COPY_SOURCE_HEADER) == null) {
+ s3GAction = S3GAction.CREATE_MULTIPART_KEY;
+ } else {
+ s3GAction = S3GAction.CREATE_MULTIPART_KEY_BY_COPY;
+ }
// If uploadID is specified, it is a request for upload part
return createMultipartKey(volume, bucketName, keyPath, length,
- partNumber, uploadID, body);
+ partNumber, uploadID, body, perf);
}
copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
@@ -251,7 +257,7 @@ public class ObjectEndpoint extends EndpointBase {
s3GAction = S3GAction.COPY_OBJECT;
CopyObjectResponse copyObjectResponse = copyObject(volume,
copyHeader, bucketName, keyPath, replicationConfig,
- storageTypeDefault);
+ storageTypeDefault, perf);
return Response.status(Status.OK).entity(copyObjectResponse).header(
"Connection", "close").build();
}
@@ -270,6 +276,9 @@ public class ObjectEndpoint extends EndpointBase {
s3GAction = S3GAction.CREATE_DIRECTORY;
getClientProtocol()
.createDirectory(volume.getName(), bucketName, keyPath);
+ long metadataLatencyNs =
+ getMetrics().updatePutKeyMetadataStats(startNanos);
+ perf.appendMetaLatencyNanos(metadataLatencyNs);
return Response.ok().status(HttpStatus.SC_OK).build();
}
@@ -297,17 +306,19 @@ public class ObjectEndpoint extends EndpointBase {
long putLength;
String eTag = null;
if (datastreamEnabled && !enableEC && length > datastreamMinLength) {
- getMetrics().updatePutKeyMetadataStats(startNanos);
+ perf.appendStreamMode();
Pair<String, Long> keyWriteResult = ObjectEndpointStreaming
.put(bucket, keyPath, length, replicationConfig, chunkSize,
- customMetadata, (DigestInputStream) body);
+ customMetadata, (DigestInputStream) body, perf);
eTag = keyWriteResult.getKey();
putLength = keyWriteResult.getValue();
} else {
try (OzoneOutputStream output = getClientProtocol().createKey(
volume.getName(), bucketName, keyPath, length, replicationConfig,
customMetadata)) {
- getMetrics().updatePutKeyMetadataStats(startNanos);
+ long metadataLatencyNs =
+ getMetrics().updatePutKeyMetadataStats(startNanos);
+ perf.appendMetaLatencyNanos(metadataLatencyNs);
putLength = IOUtils.copyLarge(body, output);
eTag = DatatypeConverter.printHexBinary(
((DigestInputStream) body).getMessageDigest().digest())
@@ -315,8 +326,8 @@ public class ObjectEndpoint extends EndpointBase {
output.getMetadata().put(ETAG, eTag);
}
}
-
getMetrics().incPutKeySuccessLength(putLength);
+ perf.appendSizeBytes(putLength);
return Response.ok()
.header(ETAG, wrapInQuotes(eTag))
.status(HttpStatus.SC_OK)
@@ -356,9 +367,10 @@ public class ObjectEndpoint extends EndpointBase {
throw ex;
} finally {
if (auditSuccess) {
- AUDIT.logWriteSuccess(
- buildAuditMessageForSuccess(s3GAction, getAuditParameters()));
- getMetrics().updateCreateKeySuccessStats(startNanos);
+ long opLatencyNs =
getMetrics().updateCreateKeySuccessStats(startNanos);
+ perf.appendOpLatencyNanos(opLatencyNs);
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(s3GAction,
+ getAuditParameters(), perf.build()));
}
}
}
@@ -372,6 +384,7 @@ public class ObjectEndpoint extends EndpointBase {
* https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadListParts.html
* for more details.
*/
+ @SuppressWarnings("checkstyle:MethodLength")
@GET
public Response get(
@PathParam("bucket") String bucketName,
@@ -383,15 +396,17 @@ public class ObjectEndpoint extends EndpointBase {
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.GET_KEY;
- boolean auditSuccess = true;
-
+ PerformanceStringBuilder perf = new PerformanceStringBuilder();
try {
if (uploadId != null) {
// When we have uploadId, this is the request for list Parts.
s3GAction = S3GAction.LIST_PARTS;
int partMarker = parsePartNumberMarker(partNumberMarker);
- return listParts(bucketName, keyPath, uploadId,
- partMarker, maxParts);
+ Response response = listParts(bucketName, keyPath, uploadId,
+ partMarker, maxParts, perf);
+ AUDIT.logReadSuccess(buildAuditMessageForSuccess(s3GAction,
+ getAuditParameters(), perf.build()));
+ return response;
}
OzoneKeyDetails keyDetails = (partNumber != 0) ?
@@ -424,8 +439,12 @@ public class ObjectEndpoint extends EndpointBase {
try (OzoneInputStream key = keyDetails.getContent()) {
long readLength = IOUtils.copyLarge(key, dest);
getMetrics().incGetKeySuccessLength(readLength);
+ perf.appendSizeBytes(readLength);
}
- getMetrics().updateGetKeySuccessStats(startNanos);
+ long opLatencyNs =
getMetrics().updateGetKeySuccessStats(startNanos);
+ perf.appendOpLatencyNanos(opLatencyNs);
+ AUDIT.logReadSuccess(buildAuditMessageForSuccess(S3GAction.GET_KEY,
+ getAuditParameters(), perf.build()));
};
responseBuilder = Response
.ok(output)
@@ -444,8 +463,12 @@ public class ObjectEndpoint extends EndpointBase {
long readLength = IOUtils.copyLarge(ozoneInputStream, dest, 0,
copyLength, new byte[bufferSize]);
getMetrics().incGetKeySuccessLength(readLength);
+ perf.appendSizeBytes(readLength);
}
- getMetrics().updateGetKeySuccessStats(startNanos);
+ long opLatencyNs = getMetrics().updateGetKeySuccessStats(startNanos);
+ perf.appendOpLatencyNanos(opLatencyNs);
+ AUDIT.logReadSuccess(buildAuditMessageForSuccess(S3GAction.GET_KEY,
+ getAuditParameters(), perf.build()));
};
responseBuilder = Response
.status(Status.PARTIAL_CONTENT)
@@ -486,10 +509,11 @@ public class ObjectEndpoint extends EndpointBase {
}
}
addLastModifiedDate(responseBuilder, keyDetails);
- getMetrics().updateGetKeyMetadataStats(startNanos);
+ long metadataLatencyNs =
+ getMetrics().updateGetKeyMetadataStats(startNanos);
+ perf.appendMetaLatencyNanos(metadataLatencyNs);
return responseBuilder.build();
} catch (OMException ex) {
- auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(s3GAction, getAuditParameters(), ex)
);
@@ -508,17 +532,10 @@ public class ObjectEndpoint extends EndpointBase {
throw ex;
}
} catch (Exception ex) {
- auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(s3GAction, getAuditParameters(), ex)
);
throw ex;
- } finally {
- if (auditSuccess) {
- AUDIT.logReadSuccess(
- buildAuditMessageForSuccess(s3GAction, getAuditParameters())
- );
- }
}
}
@@ -847,10 +864,10 @@ public class ObjectEndpoint extends EndpointBase {
}
}
- @SuppressWarnings("checkstyle:MethodLength")
+ @SuppressWarnings({"checkstyle:MethodLength", "checkstyle:ParameterNumber"})
private Response createMultipartKey(OzoneVolume volume, String bucket,
- String key, long length, int partNumber,
- String uploadID, InputStream body)
+ String key, long length, int partNumber, String uploadID,
+ InputStream body, PerformanceStringBuilder perf)
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
String copyHeader = null;
@@ -880,16 +897,17 @@ public class ObjectEndpoint extends EndpointBase {
}
if (datastreamEnabled && !enableEC && copyHeader == null) {
- getMetrics().updatePutKeyMetadataStats(startNanos);
+ perf.appendStreamMode();
return ObjectEndpointStreaming
.createMultipartKey(ozoneBucket, key, length, partNumber,
- uploadID, chunkSize, (DigestInputStream) body);
+ uploadID, chunkSize, (DigestInputStream) body, perf);
}
// OmMultipartCommitUploadPartInfo can only be gotten after the
// OzoneOutputStream is closed, so we need to save the KeyOutputStream
// in the OzoneOutputStream and use it to get the
// OmMultipartCommitUploadPartInfo after OzoneOutputStream is closed.
KeyOutputStream keyOutputStream = null;
+ long metadataLatencyNs;
if (copyHeader != null) {
Pair<String, String> result = parseSourceHeader(copyHeader);
String sourceBucket = result.getLeft();
@@ -933,7 +951,8 @@ public class ObjectEndpoint extends EndpointBase {
try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
.createMultipartKey(volume.getName(), bucket, key, length,
partNumber, uploadID)) {
- getMetrics().updateCopyKeyMetadataStats(startNanos);
+ metadataLatencyNs =
+ getMetrics().updateCopyKeyMetadataStats(startNanos);
copyLength = IOUtils.copyLarge(
sourceObject, ozoneOutputStream, 0, length);
keyOutputStream = ozoneOutputStream.getKeyOutputStream();
@@ -942,19 +961,22 @@ public class ObjectEndpoint extends EndpointBase {
try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
.createMultipartKey(volume.getName(), bucket, key, length,
partNumber, uploadID)) {
- getMetrics().updateCopyKeyMetadataStats(startNanos);
+ metadataLatencyNs =
+ getMetrics().updateCopyKeyMetadataStats(startNanos);
copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream);
keyOutputStream = ozoneOutputStream.getKeyOutputStream();
}
}
getMetrics().incCopyObjectSuccessLength(copyLength);
+ perf.appendSizeBytes(copyLength);
}
} else {
long putLength;
try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
.createMultipartKey(volume.getName(), bucket, key, length,
partNumber, uploadID)) {
- getMetrics().updatePutKeyMetadataStats(startNanos);
+ metadataLatencyNs =
+ getMetrics().updatePutKeyMetadataStats(startNanos);
putLength = IOUtils.copyLarge(body, ozoneOutputStream);
((KeyMetadataAware)ozoneOutputStream.getOutputStream())
.getMetadata().put(ETAG, DatatypeConverter.printHexBinary(
@@ -964,7 +986,9 @@ public class ObjectEndpoint extends EndpointBase {
= ozoneOutputStream.getKeyOutputStream();
}
getMetrics().incPutKeySuccessLength(putLength);
+ perf.appendSizeBytes(putLength);
}
+ perf.appendMetaLatencyNanos(metadataLatencyNs);
assert keyOutputStream != null;
OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
@@ -1012,7 +1036,8 @@ public class ObjectEndpoint extends EndpointBase {
* @throws OS3Exception
*/
private Response listParts(String bucket, String key, String uploadID,
- int partNumberMarker, int maxParts) throws IOException, OS3Exception {
+ int partNumberMarker, int maxParts, PerformanceStringBuilder perf)
+ throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
ListPartsResponse listPartsResponse = new ListPartsResponse();
try {
@@ -1055,7 +1080,9 @@ public class ObjectEndpoint extends EndpointBase {
}
throw ex;
}
- getMetrics().updateListPartsSuccessStats(startNanos);
+ long opLatencyNs = getMetrics().updateListPartsSuccessStats(startNanos);
+ perf.appendCount(listPartsResponse.getPartList().size());
+ perf.appendOpLatencyNanos(opLatencyNs);
return Response.status(Status.OK).entity(listPartsResponse).build();
}
@@ -1069,33 +1096,40 @@ public class ObjectEndpoint extends EndpointBase {
this.context = context;
}
+ @SuppressWarnings("checkstyle:ParameterNumber")
void copy(OzoneVolume volume, InputStream src, long srcKeyLen,
String destKey, String destBucket,
ReplicationConfig replication,
- Map<String, String> metadata) throws IOException {
+ Map<String, String> metadata,
+ PerformanceStringBuilder perf, long startNanos)
+ throws IOException {
long copyLength;
if (datastreamEnabled && !(replication != null &&
replication.getReplicationType() == EC) &&
srcKeyLen > datastreamMinLength) {
+ perf.appendStreamMode();
copyLength = ObjectEndpointStreaming
.copyKeyWithStream(volume.getBucket(destBucket), destKey, srcKeyLen,
- chunkSize, replication, metadata, src);
+ chunkSize, replication, metadata, src, perf, startNanos);
} else {
try (OzoneOutputStream dest = getClientProtocol()
.createKey(volume.getName(), destBucket, destKey, srcKeyLen,
replication, metadata)) {
+ long metadataLatencyNs =
+ getMetrics().updateCopyKeyMetadataStats(startNanos);
+ perf.appendMetaLatencyNanos(metadataLatencyNs);
copyLength = IOUtils.copyLarge(src, dest);
}
}
getMetrics().incCopyObjectSuccessLength(copyLength);
+ perf.appendSizeBytes(copyLength);
}
+ @SuppressWarnings("checkstyle:ParameterNumber")
private CopyObjectResponse copyObject(OzoneVolume volume,
- String copyHeader,
- String destBucket,
- String destkey,
- ReplicationConfig replicationConfig,
- boolean storageTypeDefault)
+ String copyHeader, String destBucket, String destkey,
+ ReplicationConfig replicationConfig, boolean storageTypeDefault,
+ PerformanceStringBuilder perf)
throws OS3Exception, IOException {
long startNanos = Time.monotonicNowNanos();
Pair<String, String> result = parseSourceHeader(copyHeader);
@@ -1138,7 +1172,7 @@ public class ObjectEndpoint extends EndpointBase {
sourceBucket, sourceKey)) {
getMetrics().updateCopyKeyMetadataStats(startNanos);
copy(volume, src, sourceKeyLen, destkey, destBucket, replicationConfig,
- sourceKeyDetails.getMetadata());
+ sourceKeyDetails.getMetadata(), perf, startNanos);
}
final OzoneKeyDetails destKeyDetails = getClientProtocol().getKeyDetails(
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index b536b3248b..dbc7f374a9 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -30,6 +30,7 @@ import
org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.metrics.S3GatewayMetrics;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.nio.ByteBuffer;
import java.security.DigestInputStream;
import java.util.Map;
+import static
org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
@@ -54,16 +56,17 @@ final class ObjectEndpointStreaming {
private ObjectEndpointStreaming() {
}
+ @SuppressWarnings("checkstyle:ParameterNumber")
public static Pair<String, Long> put(
OzoneBucket bucket, String keyPath,
long length, ReplicationConfig replicationConfig,
int chunkSize, Map<String, String> keyMetadata,
- DigestInputStream body)
+ DigestInputStream body, PerformanceStringBuilder perf)
throws IOException, OS3Exception {
try {
return putKeyWithStream(bucket, keyPath,
- length, chunkSize, replicationConfig, keyMetadata, body);
+ length, chunkSize, replicationConfig, keyMetadata, body, perf);
} catch (IOException ex) {
LOG.error("Exception occurred in PutObject", ex);
if (ex instanceof OMException) {
@@ -86,6 +89,7 @@ final class ObjectEndpointStreaming {
}
}
+ @SuppressWarnings("checkstyle:ParameterNumber")
public static Pair<String, Long> putKeyWithStream(
OzoneBucket bucket,
String keyPath,
@@ -93,20 +97,25 @@ final class ObjectEndpointStreaming {
int bufferSize,
ReplicationConfig replicationConfig,
Map<String, String> keyMetadata,
- DigestInputStream body)
+ DigestInputStream body, PerformanceStringBuilder perf)
throws IOException {
+ S3GatewayMetrics metrics = S3GatewayMetrics.create();
+ long startNanos = Time.monotonicNowNanos();
long writeLen;
String eTag;
try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
length, replicationConfig, keyMetadata)) {
+ long metadataLatencyNs = metrics.updatePutKeyMetadataStats(startNanos);
writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest())
.toLowerCase();
+ perf.appendMetaLatencyNanos(metadataLatencyNs);
((KeyMetadataAware)streamOutput).getMetadata().put("ETag", eTag);
}
return Pair.of(eTag, writeLen);
}
+ @SuppressWarnings("checkstyle:ParameterNumber")
public static long copyKeyWithStream(
OzoneBucket bucket,
String keyPath,
@@ -114,10 +123,15 @@ final class ObjectEndpointStreaming {
int bufferSize,
ReplicationConfig replicationConfig,
Map<String, String> keyMetadata,
- InputStream body) throws IOException {
+ InputStream body, PerformanceStringBuilder perf, long startNanos)
+ throws IOException {
long writeLen = 0;
+ S3GatewayMetrics metrics = S3GatewayMetrics.create();
try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
length, replicationConfig, keyMetadata)) {
+ long metadataLatencyNs =
+ metrics.updateCopyKeyMetadataStats(startNanos);
+ perf.appendMetaLatencyNanos(metadataLatencyNs);
writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
}
return writeLen;
@@ -141,11 +155,12 @@ final class ObjectEndpointStreaming {
return n;
}
+ @SuppressWarnings("checkstyle:ParameterNumber")
public static Response createMultipartKey(OzoneBucket ozoneBucket, String
key,
- long length, int partNumber,
- String uploadID, int chunkSize,
- DigestInputStream body)
+ long length, int partNumber, String uploadID, int chunkSize,
+ DigestInputStream body, PerformanceStringBuilder perf)
throws IOException, OS3Exception {
+ long startNanos = Time.monotonicNowNanos();
String eTag;
S3GatewayMetrics metrics = S3GatewayMetrics.create();
// OmMultipartCommitUploadPartInfo can only be gotten after the
@@ -156,12 +171,15 @@ final class ObjectEndpointStreaming {
try {
try (OzoneDataStreamOutput streamOutput = ozoneBucket
.createMultipartStreamKey(key, length, partNumber, uploadID)) {
+ long metadataLatencyNs = metrics.updatePutKeyMetadataStats(startNanos);
long putLength =
writeToStreamOutput(streamOutput, body, chunkSize, length);
eTag = DatatypeConverter.printHexBinary(
body.getMessageDigest().digest()).toLowerCase();
((KeyMetadataAware)streamOutput).getMetadata().put("ETag", eTag);
metrics.incPutKeySuccessLength(putLength);
+ perf.appendMetaLatencyNanos(metadataLatencyNs);
+ perf.appendSizeBytes(putLength);
keyDataStreamOutput = streamOutput.getKeyDataStreamOutput();
}
} catch (OMException ex) {
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
index b18b9f3354..10b7b167b9 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
@@ -362,9 +362,9 @@ public final class S3GatewayMetrics implements
MetricsSource {
// INC and UPDATE
// BucketEndpoint
- public void updateGetBucketSuccessStats(long startNanos) {
+ public long updateGetBucketSuccessStats(long startNanos) {
getBucketSuccess.incr();
- getBucketSuccessLatencyNs.add(Time.monotonicNowNanos() - startNanos);
+ return updateAndGetStats(getBucketSuccessLatencyNs, startNanos);
}
public void updateGetBucketFailureStats(long startNanos) {
@@ -447,10 +447,9 @@ public final class S3GatewayMetrics implements
MetricsSource {
// ObjectEndpoint
- public void updateCreateMultipartKeySuccessStats(long startNanos) {
+ public long updateCreateMultipartKeySuccessStats(long startNanos) {
createMultipartKeySuccess.incr();
- createMultipartKeySuccessLatencyNs.add(
- Time.monotonicNowNanos() - startNanos);
+ return updateAndGetStats(createMultipartKeySuccessLatencyNs, startNanos);
}
public void updateCreateMultipartKeyFailureStats(long startNanos) {
@@ -459,9 +458,9 @@ public final class S3GatewayMetrics implements
MetricsSource {
Time.monotonicNowNanos() - startNanos);
}
- public void updateCopyObjectSuccessStats(long startNanos) {
+ public long updateCopyObjectSuccessStats(long startNanos) {
copyObjectSuccess.incr();
- copyObjectSuccessLatencyNs.add(Time.monotonicNowNanos() - startNanos);
+ return updateAndGetStats(copyObjectSuccessLatencyNs, startNanos);
}
public void updateCopyObjectFailureStats(long startNanos) {
@@ -469,9 +468,9 @@ public final class S3GatewayMetrics implements
MetricsSource {
copyObjectFailureLatencyNs.add(Time.monotonicNowNanos() - startNanos);
}
- public void updateCreateKeySuccessStats(long startNanos) {
+ public long updateCreateKeySuccessStats(long startNanos) {
createKeySuccess.incr();
- createKeySuccessLatencyNs.add(Time.monotonicNowNanos() - startNanos);
+ return updateAndGetStats(createKeySuccessLatencyNs, startNanos);
}
public void updateCreateKeyFailureStats(long startNanos) {
@@ -479,9 +478,9 @@ public final class S3GatewayMetrics implements
MetricsSource {
createKeyFailureLatencyNs.add(Time.monotonicNowNanos() - startNanos);
}
- public void updateListPartsSuccessStats(long startNanos) {
+ public long updateListPartsSuccessStats(long startNanos) {
listPartsSuccess.incr();
- listPartsSuccessLatencyNs.add(Time.monotonicNowNanos() - startNanos);
+ return updateAndGetStats(listPartsSuccessLatencyNs, startNanos);
}
public void updateListPartsFailureStats(long startNanos) {
@@ -489,9 +488,9 @@ public final class S3GatewayMetrics implements
MetricsSource {
listPartsFailureLatencyNs.add(Time.monotonicNowNanos() - startNanos);
}
- public void updateGetKeySuccessStats(long startNanos) {
+ public long updateGetKeySuccessStats(long startNanos) {
getKeySuccess.incr();
- getKeySuccessLatencyNs.add(Time.monotonicNowNanos() - startNanos);
+ return updateAndGetStats(getKeySuccessLatencyNs, startNanos);
}
public void updateGetKeyFailureStats(long startNanos) {
@@ -555,16 +554,16 @@ public final class S3GatewayMetrics implements
MetricsSource {
deleteKeyFailureLatencyNs.add(Time.monotonicNowNanos() - startNanos);
}
- public void updateGetKeyMetadataStats(long startNanos) {
- getKeyMetadataLatencyNs.add(Time.monotonicNowNanos() - startNanos);
+ public long updateGetKeyMetadataStats(long startNanos) {
+ return updateAndGetStats(getKeyMetadataLatencyNs, startNanos);
}
- public void updateCopyKeyMetadataStats(long startNanos) {
- copyKeyMetadataLatencyNs.add(Time.monotonicNowNanos() - startNanos);
+ public long updateCopyKeyMetadataStats(long startNanos) {
+ return updateAndGetStats(copyKeyMetadataLatencyNs, startNanos);
}
- public void updatePutKeyMetadataStats(long startNanos) {
- putKeyMetadataLatencyNs.add(Time.monotonicNowNanos() - startNanos);
+ public long updatePutKeyMetadataStats(long startNanos) {
+ return updateAndGetStats(putKeyMetadataLatencyNs, startNanos);
}
public void incCopyObjectSuccessLength(long bytes) {
@@ -719,4 +718,10 @@ public final class S3GatewayMetrics implements
MetricsSource {
public long getListS3BucketsFailure() {
return listS3BucketsFailure.value();
}
+
+ private long updateAndGetStats(MutableRate metric, long startNanos) {
+ long value = Time.monotonicNowNanos() - startNanos;
+ metric.add(value);
+ return value;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]