This is an automated email from the ASF dual-hosted git repository.

peterxcli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cd2e68c502e HDDS-14427. Pull up ObjectEndpoint helpers into 
EndpointBase (#9636)
cd2e68c502e is described below

commit cd2e68c502e7271e18e87a3fdd104a8bd89109ca
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Jan 15 06:27:27 2026 +0100

    HDDS-14427. Pull up ObjectEndpoint helpers into EndpointBase (#9636)
---
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     | 266 ++++++++++++++++++++
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   | 272 +--------------------
 2 files changed, 273 insertions(+), 265 deletions(-)

diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index 356272ccd40..02a09002b06 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -18,21 +18,42 @@
 package org.apache.hadoop.ozone.s3.endpoint;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.ETAG;
 import static org.apache.hadoop.ozone.OzoneConsts.KB;
+import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
+import static 
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_ARGUMENT;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_TAG;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.AWS_TAG_PREFIX;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.CUSTOM_METADATA_HEADER_PREFIX;
+import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CONFIG_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_KEY_LENGTH_LIMIT;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_NUM_LIMIT;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_REGEX_PATTERN;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_VALUE_LENGTH_LIMIT;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.hasMultiChunksPayload;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.hasUnsignedPayload;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.urlDecode;
+import static 
org.apache.hadoop.ozone.s3.util.S3Utils.validateMultiChunksUpload;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.validateSignatureHeader;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.text.ParseException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -40,6 +61,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -51,8 +73,13 @@
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
+import net.jcip.annotations.Immutable;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.AuditAction;
 import org.apache.hadoop.ozone.audit.AuditEventStatus;
@@ -62,19 +89,25 @@
 import org.apache.hadoop.ozone.audit.AuditMessage;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.protocol.S3Auth;
+import org.apache.hadoop.ozone.s3.MultiDigestInputStream;
 import org.apache.hadoop.ozone.s3.RequestIdentifier;
+import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
+import org.apache.hadoop.ozone.s3.UnsignedChunksInputStream;
 import org.apache.hadoop.ozone.s3.commontypes.RequestParameters;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
 import org.apache.hadoop.ozone.s3.metrics.S3GatewayMetrics;
 import org.apache.hadoop.ozone.s3.signature.SignatureInfo;
 import org.apache.hadoop.ozone.s3.util.AuditUtils;
+import org.apache.hadoop.ozone.s3.util.S3Utils;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.utils.URLEncodedUtils;
@@ -88,6 +121,27 @@ public abstract class EndpointBase {
 
   protected static final String ETAG_CUSTOM = "etag-custom";
 
+  private static final ThreadLocal<MessageDigest> E_TAG_PROVIDER;
+  private static final ThreadLocal<MessageDigest> SHA_256_PROVIDER;
+
+  static {
+    E_TAG_PROVIDER = ThreadLocal.withInitial(() -> {
+      try {
+        return MessageDigest.getInstance(OzoneConsts.MD5_HASH);
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    SHA_256_PROVIDER = ThreadLocal.withInitial(() -> {
+      try {
+        return MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
   @Inject
   private OzoneConfiguration ozoneConfiguration;
 
@@ -100,6 +154,10 @@ public abstract class EndpointBase {
   private RequestIdentifier requestIdentifier;
 
   private S3Auth s3Auth;
+  private int bufferSize;
+  private int chunkSize;
+  private boolean datastreamEnabled;
+  private long datastreamMinLength;
 
   @Context
   private ContainerRequestContext context;
@@ -166,6 +224,21 @@ public void initialization() {
         getClient().getObjectStore().getClientProxy();
     clientProtocol.setThreadLocalS3Auth(s3Auth);
     clientProtocol.setIsS3Request(true);
+
+    bufferSize = (int) getOzoneConfiguration().getStorageSize(
+        OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
+        OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES);
+    chunkSize = (int) getOzoneConfiguration().getStorageSize(
+        OZONE_SCM_CHUNK_SIZE_KEY,
+        OZONE_SCM_CHUNK_SIZE_DEFAULT,
+        StorageUnit.BYTES);
+    datastreamEnabled = getOzoneConfiguration().getBoolean(
+        HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED,
+        HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT);
+    datastreamMinLength = (long) getOzoneConfiguration().getStorageSize(
+        OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
+        OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT, StorageUnit.BYTES);
+
     init();
   }
 
@@ -603,4 +676,197 @@ protected boolean isAccessDenied(OMException ex) {
     return result == ResultCodes.PERMISSION_DENIED
         || result == ResultCodes.INVALID_TOKEN;
   }
+
+  protected ReplicationConfig getReplicationConfig(OzoneBucket ozoneBucket) 
throws OS3Exception {
+    String storageType = getHeaders().getHeaderString(STORAGE_CLASS_HEADER);
+    String storageConfig = 
getHeaders().getHeaderString(CUSTOM_METADATA_HEADER_PREFIX + 
STORAGE_CONFIG_HEADER);
+
+    ReplicationConfig clientConfiguredReplicationConfig =
+        
OzoneClientUtils.getClientConfiguredReplicationConfig(getOzoneConfiguration());
+
+    return S3Utils.resolveS3ClientSideReplicationConfig(storageType, 
storageConfig,
+        clientConfiguredReplicationConfig, ozoneBucket.getReplicationConfig());
+  }
+
+  /**
+   * Parse the key and bucket name from copy header.
+   */
+  public static Pair<String, String> parseSourceHeader(String copyHeader)
+      throws OS3Exception {
+    String header = copyHeader;
+    if (header.startsWith("/")) {
+      header = copyHeader.substring(1);
+    }
+    int pos = header.indexOf('/');
+    if (pos == -1) {
+      OS3Exception ex = newError(INVALID_ARGUMENT, header);
+      ex.setErrorMessage("Copy Source must mention the source bucket and " +
+          "key: sourcebucket/sourcekey");
+      throw ex;
+    }
+
+    try {
+      String bucket = header.substring(0, pos);
+      String key = urlDecode(header.substring(pos + 1));
+      return Pair.of(bucket, key);
+    } catch (UnsupportedEncodingException e) {
+      OS3Exception ex = newError(INVALID_ARGUMENT, header, e);
+      ex.setErrorMessage("Copy Source header could not be url-decoded");
+      throw ex;
+    }
+  }
+
+  protected static int parsePartNumberMarker(String partNumberMarker) {
+    int partMarker = 0;
+    if (partNumberMarker != null) {
+      partMarker = Integer.parseInt(partNumberMarker);
+    }
+    return partMarker;
+  }
+
+  // Parses date string and return long representation. Returns an
+  // empty if DateStr is null or invalid. Dates in the future are
+  // considered invalid.
+  private static OptionalLong parseAndValidateDate(String ozoneDateStr) {
+    long ozoneDateInMs;
+    if (ozoneDateStr == null) {
+      return OptionalLong.empty();
+    }
+    try {
+      ozoneDateInMs = OzoneUtils.formatDate(ozoneDateStr);
+    } catch (ParseException e) {
+      // if time not parseable, then return empty()
+      return OptionalLong.empty();
+    }
+
+    long currentDate = System.currentTimeMillis();
+    if (ozoneDateInMs <= currentDate) {
+      return OptionalLong.of(ozoneDateInMs);
+    } else {
+      // dates in the future are invalid, so return empty()
+      return OptionalLong.empty();
+    }
+  }
+
+  public static boolean checkCopySourceModificationTime(
+      Long lastModificationTime,
+      String copySourceIfModifiedSinceStr,
+      String copySourceIfUnmodifiedSinceStr) {
+    long copySourceIfModifiedSince = Long.MIN_VALUE;
+    long copySourceIfUnmodifiedSince = Long.MAX_VALUE;
+
+    OptionalLong modifiedDate =
+        parseAndValidateDate(copySourceIfModifiedSinceStr);
+    if (modifiedDate.isPresent()) {
+      copySourceIfModifiedSince = modifiedDate.getAsLong();
+    }
+
+    OptionalLong unmodifiedDate =
+        parseAndValidateDate(copySourceIfUnmodifiedSinceStr);
+    if (unmodifiedDate.isPresent()) {
+      copySourceIfUnmodifiedSince = unmodifiedDate.getAsLong();
+    }
+    return (copySourceIfModifiedSince <= lastModificationTime) &&
+        (lastModificationTime <= copySourceIfUnmodifiedSince);
+  }
+
+  /**
+   * Create a {@link S3ChunkInputStreamInfo} that contains the necessary 
information to handle
+   * the S3 chunk upload.
+   */
+  protected S3ChunkInputStreamInfo getS3ChunkInputStreamInfo(
+      InputStream body, long contentLength, String amzDecodedLength, String 
keyPath) throws OS3Exception {
+    final String amzContentSha256Header = 
validateSignatureHeader(getHeaders(), keyPath, signatureInfo.isSignPayload());
+    final InputStream chunkInputStream;
+    final long effectiveLength;
+    if (hasMultiChunksPayload(amzContentSha256Header)) {
+      validateMultiChunksUpload(getHeaders(), amzDecodedLength, keyPath);
+      if (hasUnsignedPayload(amzContentSha256Header)) {
+        chunkInputStream = new UnsignedChunksInputStream(body);
+      } else {
+        chunkInputStream = new SignedChunksInputStream(body);
+      }
+      effectiveLength = Long.parseLong(amzDecodedLength);
+    } else {
+      // Single chunk upload: 
https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
+      // Possible x-amz-content-sha256 header values
+      // - Actual payload checksum value: For signed payload
+      // - UNSIGNED-PAYLOAD: For unsigned payload
+      chunkInputStream = body;
+      effectiveLength = contentLength;
+    }
+
+    // MessageDigest is used for ETag calculation
+    // and Sha256Digest is used for "x-amz-content-sha256" header verification
+    List<MessageDigest> digests = new ArrayList<>();
+    digests.add(getMessageDigestInstance());
+    if (!hasUnsignedPayload(amzContentSha256Header) && 
!hasMultiChunksPayload(amzContentSha256Header)) {
+      digests.add(getSha256DigestInstance());
+    }
+    MultiDigestInputStream multiDigestInputStream =
+        new MultiDigestInputStream(chunkInputStream, digests);
+    return new S3ChunkInputStreamInfo(multiDigestInputStream, effectiveLength);
+  }
+
+  public boolean isDatastreamEnabled() {
+    return datastreamEnabled;
+  }
+
+  public long getDatastreamMinLength() {
+    return datastreamMinLength;
+  }
+
+  public int getChunkSize() {
+    return chunkSize;
+  }
+
+  public MessageDigest getMessageDigestInstance() {
+    return E_TAG_PROVIDER.get();
+  }
+
+  public MessageDigest getSha256DigestInstance() {
+    return SHA_256_PROVIDER.get();
+  }
+
+  protected static String extractPartsCount(String eTag) {
+    if (eTag.contains("-")) {
+      String[] parts = eTag.replace("\"", "").split("-");
+      String lastPart = parts[parts.length - 1];
+      return lastPart.matches("\\d+") ? lastPart : null;
+    }
+    return null;
+  }
+
+  protected int getIOBufferSize(long fileLength) {
+    if (bufferSize == 0) {
+      // this is mainly for unit tests as init() will not be called in the 
unit tests
+      LOG.warn("buffer size is set to {}", IOUtils.DEFAULT_BUFFER_SIZE);
+      bufferSize = IOUtils.DEFAULT_BUFFER_SIZE;
+    }
+    if (fileLength == 0) {
+      // for empty file
+      return bufferSize;
+    } else {
+      return fileLength < bufferSize ? (int) fileLength : bufferSize;
+    }
+  }
+
+  @Immutable
+  protected static final class S3ChunkInputStreamInfo {
+    private final MultiDigestInputStream multiDigestInputStream;
+    private final long effectiveLength;
+
+    S3ChunkInputStreamInfo(MultiDigestInputStream multiDigestInputStream, long 
effectiveLength) {
+      this.multiDigestInputStream = multiDigestInputStream;
+      this.effectiveLength = effectiveLength;
+    }
+
+    public MultiDigestInputStream getMultiDigestInputStream() {
+      return multiDigestInputStream;
+    }
+
+    public long getEffectiveLength() {
+      return effectiveLength;
+    }
+  }
 }
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 773b3688688..6589083b96d 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -18,15 +18,7 @@
 package org.apache.hadoop.ozone.s3.endpoint;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT;
 import static 
org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder;
-import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
-import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
 import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED;
 import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED_DEFAULT;
 import static 
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ENTITY_TOO_SMALL;
@@ -43,43 +35,31 @@
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_IF_MODIFIED_SINCE;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_IF_UNMODIFIED_SINCE;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.CUSTOM_METADATA_COPY_DIRECTIVE_HEADER;
-import static 
org.apache.hadoop.ozone.s3.util.S3Consts.CUSTOM_METADATA_HEADER_PREFIX;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.CopyDirective;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.MP_PARTS_COUNT;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER_SUPPORTED_UNIT;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
-import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CONFIG_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_COUNT_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_DIRECTIVE_HEADER;
-import static org.apache.hadoop.ozone.s3.util.S3Utils.hasMultiChunksPayload;
-import static org.apache.hadoop.ozone.s3.util.S3Utils.hasUnsignedPayload;
 import static org.apache.hadoop.ozone.s3.util.S3Utils.stripQuotes;
-import static org.apache.hadoop.ozone.s3.util.S3Utils.urlDecode;
-import static 
org.apache.hadoop.ozone.s3.util.S3Utils.validateMultiChunksUpload;
 import static org.apache.hadoop.ozone.s3.util.S3Utils.validateSignatureHeader;
 import static org.apache.hadoop.ozone.s3.util.S3Utils.wrapInQuotes;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.text.ParseException;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.OptionalLong;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
@@ -98,17 +78,14 @@
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import javax.xml.bind.DatatypeConverter;
-import net.jcip.annotations.Immutable;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.S3GAction;
 import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
@@ -124,8 +101,6 @@
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.s3.HeaderPreprocessor;
 import org.apache.hadoop.ozone.s3.MultiDigestInputStream;
-import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
-import org.apache.hadoop.ozone.s3.UnsignedChunksInputStream;
 import org.apache.hadoop.ozone.s3.endpoint.S3Tagging.Tag;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
@@ -135,8 +110,6 @@
 import org.apache.hadoop.ozone.s3.util.S3Consts;
 import org.apache.hadoop.ozone.s3.util.S3Consts.QueryParams;
 import org.apache.hadoop.ozone.s3.util.S3StorageType;
-import org.apache.hadoop.ozone.s3.util.S3Utils;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.http.HttpStatus;
 import org.apache.ratis.util.function.CheckedRunnable;
@@ -155,34 +128,9 @@ public class ObjectEndpoint extends EndpointBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(ObjectEndpoint.class);
 
-  private static final ThreadLocal<MessageDigest> E_TAG_PROVIDER;
-  private static final ThreadLocal<MessageDigest> SHA_256_PROVIDER;
-
-  static {
-    E_TAG_PROVIDER = ThreadLocal.withInitial(() -> {
-      try {
-        return MessageDigest.getInstance(OzoneConsts.MD5_HASH);
-      } catch (NoSuchAlgorithmException e) {
-        throw new RuntimeException(e);
-      }
-    });
-
-    SHA_256_PROVIDER = ThreadLocal.withInitial(() -> {
-      try {
-        return MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-      } catch (NoSuchAlgorithmException e) {
-        throw new RuntimeException(e);
-      }
-    });
-  }
-
   /*FOR the feature Overriding Response Header
   https://docs.aws.amazon.com/de_de/AmazonS3/latest/API/API_GetObject.html */
   private final Map<String, String> overrideQueryParameter;
-  private int bufferSize;
-  private int chunkSize;
-  private boolean datastreamEnabled;
-  private long datastreamMinLength;
 
   public ObjectEndpoint() {
     overrideQueryParameter = ImmutableMap.<String, String>builder()
@@ -195,23 +143,6 @@ public ObjectEndpoint() {
         .build();
   }
 
-  @Override
-  protected void init() {
-    bufferSize = (int) getOzoneConfiguration().getStorageSize(
-        OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
-        OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES);
-    chunkSize = (int) getOzoneConfiguration().getStorageSize(
-        OZONE_SCM_CHUNK_SIZE_KEY,
-        OZONE_SCM_CHUNK_SIZE_DEFAULT,
-        StorageUnit.BYTES);
-    datastreamEnabled = getOzoneConfiguration().getBoolean(
-        HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED,
-        HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT);
-    datastreamMinLength = (long) getOzoneConfiguration().getStorageSize(
-        OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
-        OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT, StorageUnit.BYTES);
-  }
-
   /**
    * Rest endpoint to upload object to a bucket.
    * <p>
@@ -315,10 +246,10 @@ public Response put(
 
       long putLength;
       String eTag = null;
-      if (datastreamEnabled && !enableEC && length > datastreamMinLength) {
+      if (isDatastreamEnabled() && !enableEC && length > 
getDatastreamMinLength()) {
         perf.appendStreamMode();
         Pair<String, Long> keyWriteResult = ObjectEndpointStreaming
-            .put(bucket, keyPath, length, replicationConfig, chunkSize,
+            .put(bucket, keyPath, length, replicationConfig, getChunkSize(),
                 customMetadata, tags, multiDigestInputStream, getHeaders(), 
signatureInfo.isSignPayload(), perf);
         eTag = keyWriteResult.getKey();
         putLength = keyWriteResult.getValue();
@@ -862,17 +793,6 @@ public Response initializeMultipartUpload(
     }
   }
 
-  private ReplicationConfig getReplicationConfig(OzoneBucket ozoneBucket) 
throws OS3Exception {
-    String storageType = getHeaders().getHeaderString(STORAGE_CLASS_HEADER);
-    String storageConfig = 
getHeaders().getHeaderString(CUSTOM_METADATA_HEADER_PREFIX + 
STORAGE_CONFIG_HEADER);
-
-    ReplicationConfig clientConfiguredReplicationConfig =
-        
OzoneClientUtils.getClientConfiguredReplicationConfig(getOzoneConfiguration());
-
-    return S3Utils.resolveS3ClientSideReplicationConfig(storageType, 
storageConfig,
-        clientConfiguredReplicationConfig, ozoneBucket.getReplicationConfig());
-  }
-
   /**
    * Complete a multipart upload.
    */
@@ -980,11 +900,11 @@ private Response createMultipartKey(OzoneVolume volume, 
OzoneBucket ozoneBucket,
         enableEC = true;
       }
 
-      if (datastreamEnabled && !enableEC && copyHeader == null) {
+      if (isDatastreamEnabled() && !enableEC && copyHeader == null) {
         perf.appendStreamMode();
         return ObjectEndpointStreaming
             .createMultipartKey(ozoneBucket, key, length, partNumber,
-                uploadID, chunkSize, multiDigestInputStream, perf);
+                uploadID, getChunkSize(), multiDigestInputStream, perf);
       }
       // OmMultipartCommitUploadPartInfo can only be gotten after the
       // OzoneOutputStream is closed, so we need to save the OzoneOutputStream
@@ -1193,13 +1113,13 @@ void copy(OzoneVolume volume, DigestInputStream src, 
long srcKeyLen,
       Map<String, String> tags)
       throws IOException {
     long copyLength;
-    if (datastreamEnabled && !(replication != null &&
+    if (isDatastreamEnabled() && !(replication != null &&
         replication.getReplicationType() == EC) &&
-        srcKeyLen > datastreamMinLength) {
+        srcKeyLen > getDatastreamMinLength()) {
       perf.appendStreamMode();
       copyLength = ObjectEndpointStreaming
           .copyKeyWithStream(volume.getBucket(destBucket), destKey, srcKeyLen,
-              chunkSize, replication, metadata, src, perf, startNanos, tags);
+              getChunkSize(), replication, metadata, src, perf, startNanos, 
tags);
     } else {
       try (OzoneOutputStream dest = getClientProtocol()
           .createKey(volume.getName(), destBucket, destKey, srcKeyLen,
@@ -1335,89 +1255,6 @@ private CopyObjectResponse copyObject(OzoneVolume volume,
     }
   }
 
-  /**
-   * Parse the key and bucket name from copy header.
-   */
-  @VisibleForTesting
-  public static Pair<String, String> parseSourceHeader(String copyHeader)
-      throws OS3Exception {
-    String header = copyHeader;
-    if (header.startsWith("/")) {
-      header = copyHeader.substring(1);
-    }
-    int pos = header.indexOf('/');
-    if (pos == -1) {
-      OS3Exception ex = newError(INVALID_ARGUMENT, header);
-      ex.setErrorMessage("Copy Source must mention the source bucket and " +
-          "key: sourcebucket/sourcekey");
-      throw ex;
-    }
-
-    try {
-      String bucket = header.substring(0, pos);
-      String key = urlDecode(header.substring(pos + 1));
-      return Pair.of(bucket, key);
-    } catch (UnsupportedEncodingException e) {
-      OS3Exception ex = newError(INVALID_ARGUMENT, header, e);
-      ex.setErrorMessage("Copy Source header could not be url-decoded");
-      throw ex;
-    }
-  }
-
-  private static int parsePartNumberMarker(String partNumberMarker) {
-    int partMarker = 0;
-    if (partNumberMarker != null) {
-      partMarker = Integer.parseInt(partNumberMarker);
-    }
-    return partMarker;
-  }
-
-  // Parses date string and return long representation. Returns an
-  // empty if DateStr is null or invalid. Dates in the future are
-  // considered invalid.
-  private static OptionalLong parseAndValidateDate(String ozoneDateStr) {
-    long ozoneDateInMs;
-    if (ozoneDateStr == null) {
-      return OptionalLong.empty();
-    }
-    try {
-      ozoneDateInMs = OzoneUtils.formatDate(ozoneDateStr);
-    } catch (ParseException e) {
-      // if time not parseable, then return empty()
-      return OptionalLong.empty();
-    }
-
-    long currentDate = System.currentTimeMillis();
-    if (ozoneDateInMs <= currentDate) {
-      return OptionalLong.of(ozoneDateInMs);
-    } else {
-      // dates in the future are invalid, so return empty()
-      return OptionalLong.empty();
-    }
-  }
-
-  public static boolean checkCopySourceModificationTime(
-      Long lastModificationTime,
-      String copySourceIfModifiedSinceStr,
-      String copySourceIfUnmodifiedSinceStr) {
-    long copySourceIfModifiedSince = Long.MIN_VALUE;
-    long copySourceIfUnmodifiedSince = Long.MAX_VALUE;
-
-    OptionalLong modifiedDate =
-        parseAndValidateDate(copySourceIfModifiedSinceStr);
-    if (modifiedDate.isPresent()) {
-      copySourceIfModifiedSince = modifiedDate.getAsLong();
-    }
-
-    OptionalLong unmodifiedDate =
-        parseAndValidateDate(copySourceIfUnmodifiedSinceStr);
-    if (unmodifiedDate.isPresent()) {
-      copySourceIfUnmodifiedSince = unmodifiedDate.getAsLong();
-    }
-    return (copySourceIfModifiedSince <= lastModificationTime) &&
-        (lastModificationTime <= copySourceIfUnmodifiedSince);
-  }
-
   private Response putObjectTagging(OzoneBucket bucket, String keyName, 
InputStream body)
       throws IOException, OS3Exception {
     long startNanos = Time.monotonicNowNanos();
@@ -1483,99 +1320,4 @@ private Response deleteObjectTagging(OzoneVolume volume, 
String bucketName, Stri
     getMetrics().updateDeleteObjectTaggingSuccessStats(startNanos);
     return Response.noContent().build();
   }
-
-  @VisibleForTesting
-  public boolean isDatastreamEnabled() {
-    return datastreamEnabled;
-  }
-
-  @VisibleForTesting
-  public MessageDigest getMessageDigestInstance() {
-    return E_TAG_PROVIDER.get();
-  }
-
-  @VisibleForTesting
-  public MessageDigest getSha256DigestInstance() {
-    return SHA_256_PROVIDER.get();
-  }
-
-  private String extractPartsCount(String eTag) {
-    if (eTag.contains("-")) {
-      String[] parts = eTag.replace("\"", "").split("-");
-      String lastPart = parts[parts.length - 1];
-      return lastPart.matches("\\d+") ? lastPart : null;
-    }
-    return null;
-  }
-
-  private int getIOBufferSize(long fileLength) {
-    if (bufferSize == 0) {
-      // this is mainly for unit tests as init() will not be called in the 
unit tests
-      LOG.warn("buffer size is set to {}", IOUtils.DEFAULT_BUFFER_SIZE);
-      bufferSize = IOUtils.DEFAULT_BUFFER_SIZE;
-    }
-    if (fileLength == 0) {
-      // for empty file
-      return bufferSize;
-    } else {
-      return fileLength < bufferSize ? (int) fileLength : bufferSize;
-    }
-  }
-
-  /**
-   * Create a {@link S3ChunkInputStreamInfo} that contains the necessary 
information to handle
-   * the S3 chunk upload.
-   */
-  private S3ChunkInputStreamInfo getS3ChunkInputStreamInfo(
-      InputStream body, long contentLength, String amzDecodedLength, String 
keyPath) throws OS3Exception {
-    final String amzContentSha256Header = 
validateSignatureHeader(getHeaders(), keyPath, signatureInfo.isSignPayload());
-    final InputStream chunkInputStream;
-    final long effectiveLength;
-    if (hasMultiChunksPayload(amzContentSha256Header)) {
-      validateMultiChunksUpload(getHeaders(), amzDecodedLength, keyPath);
-      if (hasUnsignedPayload(amzContentSha256Header)) {
-        chunkInputStream = new UnsignedChunksInputStream(body);
-      } else {
-        chunkInputStream = new SignedChunksInputStream(body);
-      }
-      effectiveLength = Long.parseLong(amzDecodedLength);
-    } else {
-      // Single chunk upload: 
https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
-      // Possible x-amz-content-sha256 header values
-      // - Actual payload checksum value: For signed payload
-      // - UNSIGNED-PAYLOAD: For unsigned payload
-      chunkInputStream = body;
-      effectiveLength = contentLength;
-    }
-
-    // MessageDigest is used for ETag calculation
-    // and Sha256Digest is used for "x-amz-content-sha256" header verification
-    List<MessageDigest> digests = new ArrayList<>();
-    digests.add(getMessageDigestInstance());
-    if (!hasUnsignedPayload(amzContentSha256Header) && 
!hasMultiChunksPayload(amzContentSha256Header)) {
-      digests.add(getSha256DigestInstance());
-    }
-    MultiDigestInputStream multiDigestInputStream =
-        new MultiDigestInputStream(chunkInputStream, digests);
-    return new S3ChunkInputStreamInfo(multiDigestInputStream, effectiveLength);
-  }
-
-  @Immutable
-  static final class S3ChunkInputStreamInfo {
-    private final MultiDigestInputStream multiDigestInputStream;
-    private final long effectiveLength;
-
-    S3ChunkInputStreamInfo(MultiDigestInputStream multiDigestInputStream, long 
effectiveLength) {
-      this.multiDigestInputStream = multiDigestInputStream;
-      this.effectiveLength = effectiveLength;
-    }
-
-    public MultiDigestInputStream getMultiDigestInputStream() {
-      return multiDigestInputStream;
-    }
-
-    public long getEffectiveLength() {
-      return effectiveLength;
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to