This is an automated email from the ASF dual-hosted git repository.
peterxcli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cd2e68c502e HDDS-14427. Pull up ObjectEndpoint helpers into
EndpointBase (#9636)
cd2e68c502e is described below
commit cd2e68c502e7271e18e87a3fdd104a8bd89109ca
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Jan 15 06:27:27 2026 +0100
HDDS-14427. Pull up ObjectEndpoint helpers into EndpointBase (#9636)
---
.../hadoop/ozone/s3/endpoint/EndpointBase.java | 266 ++++++++++++++++++++
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 272 +--------------------
2 files changed, 273 insertions(+), 265 deletions(-)
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index 356272ccd40..02a09002b06 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -18,21 +18,42 @@
package org.apache.hadoop.ozone.s3.endpoint;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.ETAG;
import static org.apache.hadoop.ozone.OzoneConsts.KB;
+import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
+import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
+import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_ARGUMENT;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_TAG;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
import static org.apache.hadoop.ozone.s3.util.S3Consts.AWS_TAG_PREFIX;
import static
org.apache.hadoop.ozone.s3.util.S3Consts.CUSTOM_METADATA_HEADER_PREFIX;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CONFIG_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_KEY_LENGTH_LIMIT;
import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_NUM_LIMIT;
import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_REGEX_PATTERN;
import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_VALUE_LENGTH_LIMIT;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.hasMultiChunksPayload;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.hasUnsignedPayload;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.urlDecode;
+import static
org.apache.hadoop.ozone.s3.util.S3Utils.validateMultiChunksUpload;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.validateSignatureHeader;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.text.ParseException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -40,6 +61,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -51,8 +73,13 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
+import net.jcip.annotations.Immutable;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
@@ -62,19 +89,25 @@
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.protocol.S3Auth;
+import org.apache.hadoop.ozone.s3.MultiDigestInputStream;
import org.apache.hadoop.ozone.s3.RequestIdentifier;
+import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
+import org.apache.hadoop.ozone.s3.UnsignedChunksInputStream;
import org.apache.hadoop.ozone.s3.commontypes.RequestParameters;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.metrics.S3GatewayMetrics;
import org.apache.hadoop.ozone.s3.signature.SignatureInfo;
import org.apache.hadoop.ozone.s3.util.AuditUtils;
+import org.apache.hadoop.ozone.s3.util.S3Utils;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.util.Time;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
@@ -88,6 +121,27 @@ public abstract class EndpointBase {
protected static final String ETAG_CUSTOM = "etag-custom";
+ private static final ThreadLocal<MessageDigest> E_TAG_PROVIDER;
+ private static final ThreadLocal<MessageDigest> SHA_256_PROVIDER;
+
+ static {
+ E_TAG_PROVIDER = ThreadLocal.withInitial(() -> {
+ try {
+ return MessageDigest.getInstance(OzoneConsts.MD5_HASH);
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ SHA_256_PROVIDER = ThreadLocal.withInitial(() -> {
+ try {
+ return MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
@Inject
private OzoneConfiguration ozoneConfiguration;
@@ -100,6 +154,10 @@ public abstract class EndpointBase {
private RequestIdentifier requestIdentifier;
private S3Auth s3Auth;
+ private int bufferSize;
+ private int chunkSize;
+ private boolean datastreamEnabled;
+ private long datastreamMinLength;
@Context
private ContainerRequestContext context;
@@ -166,6 +224,21 @@ public void initialization() {
getClient().getObjectStore().getClientProxy();
clientProtocol.setThreadLocalS3Auth(s3Auth);
clientProtocol.setIsS3Request(true);
+
+ bufferSize = (int) getOzoneConfiguration().getStorageSize(
+ OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
+ OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES);
+ chunkSize = (int) getOzoneConfiguration().getStorageSize(
+ OZONE_SCM_CHUNK_SIZE_KEY,
+ OZONE_SCM_CHUNK_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ datastreamEnabled = getOzoneConfiguration().getBoolean(
+ HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED,
+ HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT);
+ datastreamMinLength = (long) getOzoneConfiguration().getStorageSize(
+ OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
+ OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT, StorageUnit.BYTES);
+
init();
}
@@ -603,4 +676,197 @@ protected boolean isAccessDenied(OMException ex) {
return result == ResultCodes.PERMISSION_DENIED
|| result == ResultCodes.INVALID_TOKEN;
}
+
+ protected ReplicationConfig getReplicationConfig(OzoneBucket ozoneBucket)
throws OS3Exception {
+ String storageType = getHeaders().getHeaderString(STORAGE_CLASS_HEADER);
+ String storageConfig =
getHeaders().getHeaderString(CUSTOM_METADATA_HEADER_PREFIX +
STORAGE_CONFIG_HEADER);
+
+ ReplicationConfig clientConfiguredReplicationConfig =
+
OzoneClientUtils.getClientConfiguredReplicationConfig(getOzoneConfiguration());
+
+ return S3Utils.resolveS3ClientSideReplicationConfig(storageType,
storageConfig,
+ clientConfiguredReplicationConfig, ozoneBucket.getReplicationConfig());
+ }
+
+ /**
+ * Parse the key and bucket name from copy header.
+ */
+ public static Pair<String, String> parseSourceHeader(String copyHeader)
+ throws OS3Exception {
+ String header = copyHeader;
+ if (header.startsWith("/")) {
+ header = copyHeader.substring(1);
+ }
+ int pos = header.indexOf('/');
+ if (pos == -1) {
+ OS3Exception ex = newError(INVALID_ARGUMENT, header);
+ ex.setErrorMessage("Copy Source must mention the source bucket and " +
+ "key: sourcebucket/sourcekey");
+ throw ex;
+ }
+
+ try {
+ String bucket = header.substring(0, pos);
+ String key = urlDecode(header.substring(pos + 1));
+ return Pair.of(bucket, key);
+ } catch (UnsupportedEncodingException e) {
+ OS3Exception ex = newError(INVALID_ARGUMENT, header, e);
+ ex.setErrorMessage("Copy Source header could not be url-decoded");
+ throw ex;
+ }
+ }
+
+ protected static int parsePartNumberMarker(String partNumberMarker) {
+ int partMarker = 0;
+ if (partNumberMarker != null) {
+ partMarker = Integer.parseInt(partNumberMarker);
+ }
+ return partMarker;
+ }
+
+ // Parses date string and return long representation. Returns an
+ // empty if DateStr is null or invalid. Dates in the future are
+ // considered invalid.
+ private static OptionalLong parseAndValidateDate(String ozoneDateStr) {
+ long ozoneDateInMs;
+ if (ozoneDateStr == null) {
+ return OptionalLong.empty();
+ }
+ try {
+ ozoneDateInMs = OzoneUtils.formatDate(ozoneDateStr);
+ } catch (ParseException e) {
+ // if time not parseable, then return empty()
+ return OptionalLong.empty();
+ }
+
+ long currentDate = System.currentTimeMillis();
+ if (ozoneDateInMs <= currentDate) {
+ return OptionalLong.of(ozoneDateInMs);
+ } else {
+ // dates in the future are invalid, so return empty()
+ return OptionalLong.empty();
+ }
+ }
+
+ public static boolean checkCopySourceModificationTime(
+ Long lastModificationTime,
+ String copySourceIfModifiedSinceStr,
+ String copySourceIfUnmodifiedSinceStr) {
+ long copySourceIfModifiedSince = Long.MIN_VALUE;
+ long copySourceIfUnmodifiedSince = Long.MAX_VALUE;
+
+ OptionalLong modifiedDate =
+ parseAndValidateDate(copySourceIfModifiedSinceStr);
+ if (modifiedDate.isPresent()) {
+ copySourceIfModifiedSince = modifiedDate.getAsLong();
+ }
+
+ OptionalLong unmodifiedDate =
+ parseAndValidateDate(copySourceIfUnmodifiedSinceStr);
+ if (unmodifiedDate.isPresent()) {
+ copySourceIfUnmodifiedSince = unmodifiedDate.getAsLong();
+ }
+ return (copySourceIfModifiedSince <= lastModificationTime) &&
+ (lastModificationTime <= copySourceIfUnmodifiedSince);
+ }
+
+ /**
+ * Create a {@link S3ChunkInputStreamInfo} that contains the necessary
information to handle
+ * the S3 chunk upload.
+ */
+ protected S3ChunkInputStreamInfo getS3ChunkInputStreamInfo(
+ InputStream body, long contentLength, String amzDecodedLength, String
keyPath) throws OS3Exception {
+ final String amzContentSha256Header =
validateSignatureHeader(getHeaders(), keyPath, signatureInfo.isSignPayload());
+ final InputStream chunkInputStream;
+ final long effectiveLength;
+ if (hasMultiChunksPayload(amzContentSha256Header)) {
+ validateMultiChunksUpload(getHeaders(), amzDecodedLength, keyPath);
+ if (hasUnsignedPayload(amzContentSha256Header)) {
+ chunkInputStream = new UnsignedChunksInputStream(body);
+ } else {
+ chunkInputStream = new SignedChunksInputStream(body);
+ }
+ effectiveLength = Long.parseLong(amzDecodedLength);
+ } else {
+ // Single chunk upload:
https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
+ // Possible x-amz-content-sha256 header values
+ // - Actual payload checksum value: For signed payload
+ // - UNSIGNED-PAYLOAD: For unsigned payload
+ chunkInputStream = body;
+ effectiveLength = contentLength;
+ }
+
+ // MessageDigest is used for ETag calculation
+ // and Sha256Digest is used for "x-amz-content-sha256" header verification
+ List<MessageDigest> digests = new ArrayList<>();
+ digests.add(getMessageDigestInstance());
+ if (!hasUnsignedPayload(amzContentSha256Header) &&
!hasMultiChunksPayload(amzContentSha256Header)) {
+ digests.add(getSha256DigestInstance());
+ }
+ MultiDigestInputStream multiDigestInputStream =
+ new MultiDigestInputStream(chunkInputStream, digests);
+ return new S3ChunkInputStreamInfo(multiDigestInputStream, effectiveLength);
+ }
+
+ public boolean isDatastreamEnabled() {
+ return datastreamEnabled;
+ }
+
+ public long getDatastreamMinLength() {
+ return datastreamMinLength;
+ }
+
+ public int getChunkSize() {
+ return chunkSize;
+ }
+
+ public MessageDigest getMessageDigestInstance() {
+ return E_TAG_PROVIDER.get();
+ }
+
+ public MessageDigest getSha256DigestInstance() {
+ return SHA_256_PROVIDER.get();
+ }
+
+ protected static String extractPartsCount(String eTag) {
+ if (eTag.contains("-")) {
+ String[] parts = eTag.replace("\"", "").split("-");
+ String lastPart = parts[parts.length - 1];
+ return lastPart.matches("\\d+") ? lastPart : null;
+ }
+ return null;
+ }
+
+ protected int getIOBufferSize(long fileLength) {
+ if (bufferSize == 0) {
+ // this is mainly for unit tests as init() will not be called in the
unit tests
+ LOG.warn("buffer size is set to {}", IOUtils.DEFAULT_BUFFER_SIZE);
+ bufferSize = IOUtils.DEFAULT_BUFFER_SIZE;
+ }
+ if (fileLength == 0) {
+ // for empty file
+ return bufferSize;
+ } else {
+ return fileLength < bufferSize ? (int) fileLength : bufferSize;
+ }
+ }
+
+ @Immutable
+ protected static final class S3ChunkInputStreamInfo {
+ private final MultiDigestInputStream multiDigestInputStream;
+ private final long effectiveLength;
+
+ S3ChunkInputStreamInfo(MultiDigestInputStream multiDigestInputStream, long
effectiveLength) {
+ this.multiDigestInputStream = multiDigestInputStream;
+ this.effectiveLength = effectiveLength;
+ }
+
+ public MultiDigestInputStream getMultiDigestInputStream() {
+ return multiDigestInputStream;
+ }
+
+ public long getEffectiveLength() {
+ return effectiveLength;
+ }
+ }
}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 773b3688688..6589083b96d 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -18,15 +18,7 @@
package org.apache.hadoop.ozone.s3.endpoint;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
-import static
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED;
-import static
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT;
-import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
-import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT;
import static
org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder;
-import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
-import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED;
import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED_DEFAULT;
import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ENTITY_TOO_SMALL;
@@ -43,43 +35,31 @@
import static
org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_IF_MODIFIED_SINCE;
import static
org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_IF_UNMODIFIED_SINCE;
import static
org.apache.hadoop.ozone.s3.util.S3Consts.CUSTOM_METADATA_COPY_DIRECTIVE_HEADER;
-import static
org.apache.hadoop.ozone.s3.util.S3Consts.CUSTOM_METADATA_HEADER_PREFIX;
import static org.apache.hadoop.ozone.s3.util.S3Consts.CopyDirective;
import static
org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.MP_PARTS_COUNT;
import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER;
import static
org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER_SUPPORTED_UNIT;
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
-import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CONFIG_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_COUNT_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_DIRECTIVE_HEADER;
-import static org.apache.hadoop.ozone.s3.util.S3Utils.hasMultiChunksPayload;
-import static org.apache.hadoop.ozone.s3.util.S3Utils.hasUnsignedPayload;
import static org.apache.hadoop.ozone.s3.util.S3Utils.stripQuotes;
-import static org.apache.hadoop.ozone.s3.util.S3Utils.urlDecode;
-import static
org.apache.hadoop.ozone.s3.util.S3Utils.validateMultiChunksUpload;
import static org.apache.hadoop.ozone.s3.util.S3Utils.validateSignatureHeader;
import static org.apache.hadoop.ozone.s3.util.S3Utils.wrapInQuotes;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
import java.security.DigestInputStream;
import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.OptionalLong;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
@@ -98,17 +78,14 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.xml.bind.DatatypeConverter;
-import net.jcip.annotations.Immutable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.S3GAction;
import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
@@ -124,8 +101,6 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.s3.HeaderPreprocessor;
import org.apache.hadoop.ozone.s3.MultiDigestInputStream;
-import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
-import org.apache.hadoop.ozone.s3.UnsignedChunksInputStream;
import org.apache.hadoop.ozone.s3.endpoint.S3Tagging.Tag;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
@@ -135,8 +110,6 @@
import org.apache.hadoop.ozone.s3.util.S3Consts;
import org.apache.hadoop.ozone.s3.util.S3Consts.QueryParams;
import org.apache.hadoop.ozone.s3.util.S3StorageType;
-import org.apache.hadoop.ozone.s3.util.S3Utils;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.util.Time;
import org.apache.http.HttpStatus;
import org.apache.ratis.util.function.CheckedRunnable;
@@ -155,34 +128,9 @@ public class ObjectEndpoint extends EndpointBase {
private static final Logger LOG =
LoggerFactory.getLogger(ObjectEndpoint.class);
- private static final ThreadLocal<MessageDigest> E_TAG_PROVIDER;
- private static final ThreadLocal<MessageDigest> SHA_256_PROVIDER;
-
- static {
- E_TAG_PROVIDER = ThreadLocal.withInitial(() -> {
- try {
- return MessageDigest.getInstance(OzoneConsts.MD5_HASH);
- } catch (NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- });
-
- SHA_256_PROVIDER = ThreadLocal.withInitial(() -> {
- try {
- return MessageDigest.getInstance(OzoneConsts.FILE_HASH);
- } catch (NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- });
- }
-
/*FOR the feature Overriding Response Header
https://docs.aws.amazon.com/de_de/AmazonS3/latest/API/API_GetObject.html */
private final Map<String, String> overrideQueryParameter;
- private int bufferSize;
- private int chunkSize;
- private boolean datastreamEnabled;
- private long datastreamMinLength;
public ObjectEndpoint() {
overrideQueryParameter = ImmutableMap.<String, String>builder()
@@ -195,23 +143,6 @@ public ObjectEndpoint() {
.build();
}
- @Override
- protected void init() {
- bufferSize = (int) getOzoneConfiguration().getStorageSize(
- OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
- OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES);
- chunkSize = (int) getOzoneConfiguration().getStorageSize(
- OZONE_SCM_CHUNK_SIZE_KEY,
- OZONE_SCM_CHUNK_SIZE_DEFAULT,
- StorageUnit.BYTES);
- datastreamEnabled = getOzoneConfiguration().getBoolean(
- HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED,
- HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT);
- datastreamMinLength = (long) getOzoneConfiguration().getStorageSize(
- OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
- OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT, StorageUnit.BYTES);
- }
-
/**
* Rest endpoint to upload object to a bucket.
* <p>
@@ -315,10 +246,10 @@ public Response put(
long putLength;
String eTag = null;
- if (datastreamEnabled && !enableEC && length > datastreamMinLength) {
+ if (isDatastreamEnabled() && !enableEC && length >
getDatastreamMinLength()) {
perf.appendStreamMode();
Pair<String, Long> keyWriteResult = ObjectEndpointStreaming
- .put(bucket, keyPath, length, replicationConfig, chunkSize,
+ .put(bucket, keyPath, length, replicationConfig, getChunkSize(),
customMetadata, tags, multiDigestInputStream, getHeaders(),
signatureInfo.isSignPayload(), perf);
eTag = keyWriteResult.getKey();
putLength = keyWriteResult.getValue();
@@ -862,17 +793,6 @@ public Response initializeMultipartUpload(
}
}
- private ReplicationConfig getReplicationConfig(OzoneBucket ozoneBucket)
throws OS3Exception {
- String storageType = getHeaders().getHeaderString(STORAGE_CLASS_HEADER);
- String storageConfig =
getHeaders().getHeaderString(CUSTOM_METADATA_HEADER_PREFIX +
STORAGE_CONFIG_HEADER);
-
- ReplicationConfig clientConfiguredReplicationConfig =
-
OzoneClientUtils.getClientConfiguredReplicationConfig(getOzoneConfiguration());
-
- return S3Utils.resolveS3ClientSideReplicationConfig(storageType,
storageConfig,
- clientConfiguredReplicationConfig, ozoneBucket.getReplicationConfig());
- }
-
/**
* Complete a multipart upload.
*/
@@ -980,11 +900,11 @@ private Response createMultipartKey(OzoneVolume volume,
OzoneBucket ozoneBucket,
enableEC = true;
}
- if (datastreamEnabled && !enableEC && copyHeader == null) {
+ if (isDatastreamEnabled() && !enableEC && copyHeader == null) {
perf.appendStreamMode();
return ObjectEndpointStreaming
.createMultipartKey(ozoneBucket, key, length, partNumber,
- uploadID, chunkSize, multiDigestInputStream, perf);
+ uploadID, getChunkSize(), multiDigestInputStream, perf);
}
// OmMultipartCommitUploadPartInfo can only be gotten after the
// OzoneOutputStream is closed, so we need to save the OzoneOutputStream
@@ -1193,13 +1113,13 @@ void copy(OzoneVolume volume, DigestInputStream src,
long srcKeyLen,
Map<String, String> tags)
throws IOException {
long copyLength;
- if (datastreamEnabled && !(replication != null &&
+ if (isDatastreamEnabled() && !(replication != null &&
replication.getReplicationType() == EC) &&
- srcKeyLen > datastreamMinLength) {
+ srcKeyLen > getDatastreamMinLength()) {
perf.appendStreamMode();
copyLength = ObjectEndpointStreaming
.copyKeyWithStream(volume.getBucket(destBucket), destKey, srcKeyLen,
- chunkSize, replication, metadata, src, perf, startNanos, tags);
+ getChunkSize(), replication, metadata, src, perf, startNanos,
tags);
} else {
try (OzoneOutputStream dest = getClientProtocol()
.createKey(volume.getName(), destBucket, destKey, srcKeyLen,
@@ -1335,89 +1255,6 @@ private CopyObjectResponse copyObject(OzoneVolume volume,
}
}
- /**
- * Parse the key and bucket name from copy header.
- */
- @VisibleForTesting
- public static Pair<String, String> parseSourceHeader(String copyHeader)
- throws OS3Exception {
- String header = copyHeader;
- if (header.startsWith("/")) {
- header = copyHeader.substring(1);
- }
- int pos = header.indexOf('/');
- if (pos == -1) {
- OS3Exception ex = newError(INVALID_ARGUMENT, header);
- ex.setErrorMessage("Copy Source must mention the source bucket and " +
- "key: sourcebucket/sourcekey");
- throw ex;
- }
-
- try {
- String bucket = header.substring(0, pos);
- String key = urlDecode(header.substring(pos + 1));
- return Pair.of(bucket, key);
- } catch (UnsupportedEncodingException e) {
- OS3Exception ex = newError(INVALID_ARGUMENT, header, e);
- ex.setErrorMessage("Copy Source header could not be url-decoded");
- throw ex;
- }
- }
-
- private static int parsePartNumberMarker(String partNumberMarker) {
- int partMarker = 0;
- if (partNumberMarker != null) {
- partMarker = Integer.parseInt(partNumberMarker);
- }
- return partMarker;
- }
-
- // Parses date string and return long representation. Returns an
- // empty if DateStr is null or invalid. Dates in the future are
- // considered invalid.
- private static OptionalLong parseAndValidateDate(String ozoneDateStr) {
- long ozoneDateInMs;
- if (ozoneDateStr == null) {
- return OptionalLong.empty();
- }
- try {
- ozoneDateInMs = OzoneUtils.formatDate(ozoneDateStr);
- } catch (ParseException e) {
- // if time not parseable, then return empty()
- return OptionalLong.empty();
- }
-
- long currentDate = System.currentTimeMillis();
- if (ozoneDateInMs <= currentDate) {
- return OptionalLong.of(ozoneDateInMs);
- } else {
- // dates in the future are invalid, so return empty()
- return OptionalLong.empty();
- }
- }
-
- public static boolean checkCopySourceModificationTime(
- Long lastModificationTime,
- String copySourceIfModifiedSinceStr,
- String copySourceIfUnmodifiedSinceStr) {
- long copySourceIfModifiedSince = Long.MIN_VALUE;
- long copySourceIfUnmodifiedSince = Long.MAX_VALUE;
-
- OptionalLong modifiedDate =
- parseAndValidateDate(copySourceIfModifiedSinceStr);
- if (modifiedDate.isPresent()) {
- copySourceIfModifiedSince = modifiedDate.getAsLong();
- }
-
- OptionalLong unmodifiedDate =
- parseAndValidateDate(copySourceIfUnmodifiedSinceStr);
- if (unmodifiedDate.isPresent()) {
- copySourceIfUnmodifiedSince = unmodifiedDate.getAsLong();
- }
- return (copySourceIfModifiedSince <= lastModificationTime) &&
- (lastModificationTime <= copySourceIfUnmodifiedSince);
- }
-
private Response putObjectTagging(OzoneBucket bucket, String keyName,
InputStream body)
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
@@ -1483,99 +1320,4 @@ private Response deleteObjectTagging(OzoneVolume volume,
String bucketName, Stri
getMetrics().updateDeleteObjectTaggingSuccessStats(startNanos);
return Response.noContent().build();
}
-
- @VisibleForTesting
- public boolean isDatastreamEnabled() {
- return datastreamEnabled;
- }
-
- @VisibleForTesting
- public MessageDigest getMessageDigestInstance() {
- return E_TAG_PROVIDER.get();
- }
-
- @VisibleForTesting
- public MessageDigest getSha256DigestInstance() {
- return SHA_256_PROVIDER.get();
- }
-
- private String extractPartsCount(String eTag) {
- if (eTag.contains("-")) {
- String[] parts = eTag.replace("\"", "").split("-");
- String lastPart = parts[parts.length - 1];
- return lastPart.matches("\\d+") ? lastPart : null;
- }
- return null;
- }
-
- private int getIOBufferSize(long fileLength) {
- if (bufferSize == 0) {
- // this is mainly for unit tests as init() will not be called in the
unit tests
- LOG.warn("buffer size is set to {}", IOUtils.DEFAULT_BUFFER_SIZE);
- bufferSize = IOUtils.DEFAULT_BUFFER_SIZE;
- }
- if (fileLength == 0) {
- // for empty file
- return bufferSize;
- } else {
- return fileLength < bufferSize ? (int) fileLength : bufferSize;
- }
- }
-
- /**
- * Create a {@link S3ChunkInputStreamInfo} that contains the necessary
information to handle
- * the S3 chunk upload.
- */
- private S3ChunkInputStreamInfo getS3ChunkInputStreamInfo(
- InputStream body, long contentLength, String amzDecodedLength, String
keyPath) throws OS3Exception {
- final String amzContentSha256Header =
validateSignatureHeader(getHeaders(), keyPath, signatureInfo.isSignPayload());
- final InputStream chunkInputStream;
- final long effectiveLength;
- if (hasMultiChunksPayload(amzContentSha256Header)) {
- validateMultiChunksUpload(getHeaders(), amzDecodedLength, keyPath);
- if (hasUnsignedPayload(amzContentSha256Header)) {
- chunkInputStream = new UnsignedChunksInputStream(body);
- } else {
- chunkInputStream = new SignedChunksInputStream(body);
- }
- effectiveLength = Long.parseLong(amzDecodedLength);
- } else {
- // Single chunk upload:
https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
- // Possible x-amz-content-sha256 header values
- // - Actual payload checksum value: For signed payload
- // - UNSIGNED-PAYLOAD: For unsigned payload
- chunkInputStream = body;
- effectiveLength = contentLength;
- }
-
- // MessageDigest is used for ETag calculation
- // and Sha256Digest is used for "x-amz-content-sha256" header verification
- List<MessageDigest> digests = new ArrayList<>();
- digests.add(getMessageDigestInstance());
- if (!hasUnsignedPayload(amzContentSha256Header) &&
!hasMultiChunksPayload(amzContentSha256Header)) {
- digests.add(getSha256DigestInstance());
- }
- MultiDigestInputStream multiDigestInputStream =
- new MultiDigestInputStream(chunkInputStream, digests);
- return new S3ChunkInputStreamInfo(multiDigestInputStream, effectiveLength);
- }
-
- @Immutable
- static final class S3ChunkInputStreamInfo {
- private final MultiDigestInputStream multiDigestInputStream;
- private final long effectiveLength;
-
- S3ChunkInputStreamInfo(MultiDigestInputStream multiDigestInputStream, long
effectiveLength) {
- this.multiDigestInputStream = multiDigestInputStream;
- this.effectiveLength = effectiveLength;
- }
-
- public MultiDigestInputStream getMultiDigestInputStream() {
- return multiDigestInputStream;
- }
-
- public long getEffectiveLength() {
- return effectiveLength;
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]