This is an automated email from the ASF dual-hosted git repository.
ivandika3 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8e2bdd9b9cf HDDS-15193. Move the "atomic key creation" logic from
output stream to S3 endpoints (#10202)
8e2bdd9b9cf is described below
commit 8e2bdd9b9cf946581c8ea5ba8720ed19ddbeff39
Author: Peter Lee <[email protected]>
AuthorDate: Sat May 23 15:07:02 2026 +0800
HDDS-15193. Move the "atomic key creation" logic from output stream to S3
endpoints (#10202)
---
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 15 ----
.../ozone/client/io/KeyDataStreamOutput.java | 28 +------
.../hadoop/ozone/client/io/KeyOutputStream.java | 24 ------
.../hadoop/ozone/client/io/OzoneOutputStream.java | 26 +++++-
.../ozone/client/protocol/ClientProtocol.java | 2 -
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 20 +----
.../hadoop/ozone/client/TestOzoneClient.java | 34 --------
.../ozone/client/io/TestOzoneOutputStream.java | 45 +++++++++-
.../hadoop/ozone/s3/endpoint/EndpointBase.java | 16 +++-
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 63 +++++++-------
.../ozone/s3/endpoint/ObjectEndpointStreaming.java | 67 +++++----------
.../s3/endpoint/S3ObjectStreamingWriteGuard.java | 55 +++++++++++++
.../ozone/s3/endpoint/S3ObjectWriteGuard.java | 96 ++++++++++++++++++++++
.../hadoop/ozone/client/ClientProtocolStub.java | 5 --
.../hadoop/ozone/client/OzoneBucketStub.java | 17 ++--
.../ozone/s3/endpoint/EndpointTestUtils.java | 57 ++++++++++++-
.../hadoop/ozone/s3/endpoint/TestObjectPut.java | 53 ++++++++----
.../hadoop/ozone/s3/endpoint/TestPartUpload.java | 95 +++++++++++----------
.../ozone/s3/endpoint/TestUploadWithStream.java | 30 +++++++
19 files changed, 463 insertions(+), 285 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index ee5c7548757..9f94384d4df 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -74,14 +74,6 @@ public final class ECKeyOutputStream extends KeyOutputStream
private final Future<Boolean> flushFuture;
private final AtomicLong flushCheckpoint;
- /**
- * Indicates if an atomic write is required. When set to true,
- * the amount of data written must match the declared size during the commit.
- * A mismatch will prevent the commit from succeeding.
- * This is essential for operations like S3 put to ensure atomicity.
- */
- private boolean atomicKeyCreation;
-
private volatile boolean closed;
private volatile boolean closing;
// how much of data is actually written yet to underlying stream
@@ -130,7 +122,6 @@ private ECKeyOutputStream(Builder builder) {
return flushStripeFromQueue();
});
this.flushCheckpoint = new AtomicLong(0);
- this.atomicKeyCreation = builder.getAtomicKeyCreation();
}
@Override
@@ -489,12 +480,6 @@ public void close() throws IOException {
Preconditions.checkArgument(writeOffset == offset,
"Expected writeOffset= " + writeOffset
+ " Expected offset=" + offset);
- if (atomicKeyCreation) {
- long expectedSize = blockOutputStreamEntryPool.getDataSize();
- Preconditions.checkState(expectedSize == offset, String.format(
- "Expected: %d and actual %d write sizes do not match",
- expectedSize, offset));
- }
for (CheckedRunnable<IOException> preCommit : preCommits) {
preCommit.run();
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index 7556f1e6d76..119af2f04e5 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -77,14 +77,6 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput
private long clientID;
- /**
- * Indicates if an atomic write is required. When set to true,
- * the amount of data written must match the declared size during the commit.
- * A mismatch will prevent the commit from succeeding.
- * This is essential for operations like S3 put to ensure atomicity.
- */
- private boolean atomicKeyCreation;
-
private List<CheckedRunnable<IOException>> preCommits =
Collections.emptyList();
@Override
@@ -130,7 +122,6 @@ public KeyDataStreamOutput() {
this.writeOffset = 0;
this.clientID = 0L;
- this.atomicKeyCreation = false;
}
@SuppressWarnings({"parameternumber", "squid:S00107"})
@@ -141,8 +132,7 @@ public KeyDataStreamOutput(
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber, boolean isMultipart,
- boolean unsafeByteBufferConversion,
- boolean atomicKeyCreation
+ boolean unsafeByteBufferConversion
) {
super(HddsClientUtils.getRetryPolicyByException(
config.getMaxRetryCount(), config.getRetryInterval()));
@@ -163,7 +153,6 @@ public KeyDataStreamOutput(
// encrypted bucket.
this.writeOffset = 0;
this.clientID = handler.getId();
- this.atomicKeyCreation = atomicKeyCreation;
}
/**
@@ -458,12 +447,6 @@ public void close() throws IOException {
if (!isException()) {
Preconditions.checkArgument(writeOffset == offset);
}
- if (atomicKeyCreation) {
- long expectedSize = blockDataStreamOutputEntryPool.getDataSize();
- Preconditions.checkArgument(expectedSize == offset,
- String.format("Expected: %d and actual %d write sizes do not
match",
- expectedSize, offset));
- }
for (CheckedRunnable<IOException> preCommit : preCommits) {
preCommit.run();
}
@@ -503,7 +486,6 @@ public static class Builder {
private boolean unsafeByteBufferConversion;
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
- private boolean atomicKeyCreation = false;
public Builder setMultipartUploadID(String uploadID) {
this.multipartUploadID = uploadID;
@@ -555,11 +537,6 @@ public Builder setReplicationConfig(ReplicationConfig
replConfig) {
return this;
}
- public Builder setAtomicKeyCreation(boolean atomicKey) {
- this.atomicKeyCreation = atomicKey;
- return this;
- }
-
public KeyDataStreamOutput build() {
return new KeyDataStreamOutput(
clientConfig,
@@ -572,8 +549,7 @@ public KeyDataStreamOutput build() {
multipartUploadID,
multipartNumber,
isMultipartKey,
- unsafeByteBufferConversion,
- atomicKeyCreation);
+ unsafeByteBufferConversion);
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 3a6499ea0c5..a3a1ca28030 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -98,13 +98,6 @@ public class KeyOutputStream extends OutputStream
private long clientID;
private StreamBufferArgs streamBufferArgs;
- /**
- * Indicates if an atomic write is required. When set to true,
- * the amount of data written must match the declared size during the commit.
- * A mismatch will prevent the commit from succeeding.
- * This is essential for operations like S3 put to ensure atomicity.
- */
- private boolean atomicKeyCreation;
private ContainerClientMetrics clientMetrics;
private OzoneManagerVersion ozoneManagerVersion;
private final Lock writeLock = new ReentrantLock();
@@ -187,7 +180,6 @@ public KeyOutputStream(Builder b) {
this.isException = false;
this.writeOffset = 0;
this.clientID = b.getOpenHandler().getId();
- this.atomicKeyCreation = b.getAtomicKeyCreation();
this.streamBufferArgs = b.getStreamBufferArgs();
this.clientMetrics = b.getClientMetrics();
this.ozoneManagerVersion = b.ozoneManagerVersion;
@@ -657,12 +649,6 @@ private void closeInternal() throws IOException {
if (!isException) {
Preconditions.checkArgument(writeOffset == offset);
}
- if (atomicKeyCreation) {
- long expectedSize = blockOutputStreamEntryPool.getDataSize();
- Preconditions.checkState(expectedSize == offset,
- String.format("Expected: %d and actual %d write sizes do not
match",
- expectedSize, offset));
- }
for (CheckedRunnable<IOException> preCommit : preCommits) {
preCommit.run();
}
@@ -703,7 +689,6 @@ public static class Builder {
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
private ContainerClientMetrics clientMetrics;
- private boolean atomicKeyCreation = false;
private StreamBufferArgs streamBufferArgs;
private Supplier<ExecutorService> executorServiceSupplier;
private OzoneManagerVersion ozoneManagerVersion;
@@ -802,11 +787,6 @@ public Builder setReplicationConfig(ReplicationConfig
replConfig) {
return this;
}
- public Builder setAtomicKeyCreation(boolean atomicKey) {
- this.atomicKeyCreation = atomicKey;
- return this;
- }
-
public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
this.clientMetrics = clientMetrics;
return this;
@@ -816,10 +796,6 @@ public ContainerClientMetrics getClientMetrics() {
return clientMetrics;
}
- public boolean getAtomicKeyCreation() {
- return atomicKeyCreation;
- }
-
public Builder setExecutorServiceSupplier(Supplier<ExecutorService>
executorServiceSupplier) {
this.executorServiceSupplier = executorServiceSupplier;
return this;
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
index c0e14b089ef..a7eda7da284 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
@@ -19,12 +19,14 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.ratis.util.function.CheckedRunnable;
/**
* OzoneOutputStream is used to write data into Ozone.
@@ -128,9 +130,9 @@ public void hsync() throws IOException {
}
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
- KeyOutputStream keyOutputStream = getKeyOutputStream();
- if (keyOutputStream != null) {
- return keyOutputStream.getCommitUploadPartInfo();
+ KeyCommitOutput keyCommitOutput = getKeyCommitOutput();
+ if (keyCommitOutput != null) {
+ return keyCommitOutput.getCommitUploadPartInfo();
}
// Otherwise return null.
return null;
@@ -139,12 +141,23 @@ public OmMultipartCommitUploadPartInfo
getCommitUploadPartInfo() {
public OutputStream getOutputStream() {
return outputStream;
}
-
+
public KeyOutputStream getKeyOutputStream() {
OutputStream base = unwrap(outputStream);
return base instanceof KeyOutputStream ? (KeyOutputStream) base : null;
}
+ public void setPreCommits(List<CheckedRunnable<IOException>> preCommits) {
+ KeyCommitOutput keyCommitOutput = getKeyCommitOutput();
+ if (keyCommitOutput != null) {
+ keyCommitOutput.setPreCommits(preCommits);
+ return;
+ }
+ throw new IllegalStateException(
+ "Output stream is not backed by KeyCommitOutput: " +
+ outputStream.getClass());
+ }
+
@Override
public Map<String, String> getMetadata() {
OutputStream base = unwrap(outputStream);
@@ -155,6 +168,11 @@ public Map<String, String> getMetadata() {
"OutputStream is not KeyMetadataAware: " + base.getClass());
}
+ private KeyCommitOutput getKeyCommitOutput() {
+ OutputStream base = unwrap(outputStream);
+ return base instanceof KeyCommitOutput ? (KeyCommitOutput) base : null;
+ }
+
private static OutputStream unwrap(OutputStream out) {
if (out instanceof CryptoOutputStream) {
return ((CryptoOutputStream) out).getWrappedStream();
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index c8611043fe4..8653048f8b2 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -1224,8 +1224,6 @@ OzoneKey headObject(String volumeName, String bucketName,
*/
void setThreadLocalS3Auth(S3Auth s3Auth);
- void setIsS3Request(boolean isS3Request);
-
/**
* Gets the S3 Authentication information that is attached to the thread.
* @return S3 Authentication information.
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 0b058362a30..d1f302a5cba 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -55,7 +55,6 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.crypto.Cipher;
@@ -223,7 +222,6 @@ public class RpcClient implements ClientProtocol {
private final MemoizedSupplier<ExecutorService> ecReconstructExecutor;
private final ContainerClientMetrics clientMetrics;
private final MemoizedSupplier<ExecutorService> writeExecutor;
- private final AtomicBoolean isS3GRequest = new AtomicBoolean(false);
private volatile OzoneFsServerDefaults serverDefaults;
private volatile long serverDefaultsLastUpdate;
private final long serverDefaultsValidityPeriod;
@@ -1473,13 +1471,6 @@ private OmKeyArgs.Builder
createWriteKeyArgsBuilder(String volumeName,
private OzoneOutputStream openOutputStream(OmKeyArgs keyArgs, long size)
throws IOException {
OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
- // For bucket with layout OBJECT_STORE, when create an empty file (size=0),
- // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE,
- // which will cause S3G's atomic write length check to fail,
- // so reset size to 0 here.
- if (isS3GRequest.get() && size == 0) {
- openKey.getKeyInfo().setDataSize(0);
- }
return createOutputStream(openKey);
}
@@ -2588,15 +2579,12 @@ private OzoneDataStreamOutput
createDataStreamOutput(OpenKeySession openKey)
}
private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() {
- // Amazon S3 never adds partial objects, So for S3 requests we need to
- // set atomicKeyCreation to true
// refer:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
return new KeyDataStreamOutput.Builder()
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
- .setConfig(clientConfig)
- .setAtomicKeyCreation(isS3GRequest.get());
+ .setConfig(clientConfig);
}
private OzoneOutputStream createOutputStream(OpenKeySession openKey)
@@ -2670,7 +2658,6 @@ private KeyOutputStream.Builder createKeyOutputStream(
.setOmClient(ozoneManagerClient)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
- .setAtomicKeyCreation(isS3GRequest.get())
.setClientMetrics(clientMetrics)
.setExecutorServiceSupplier(writeExecutor)
.setStreamBufferArgs(streamBufferArgs)
@@ -2774,11 +2761,6 @@ public void setThreadLocalS3Auth(
this.s3gUgi =
UserGroupInformation.createRemoteUser(getThreadLocalS3Auth().getUserPrincipal());
}
- @Override
- public void setIsS3Request(boolean s3Request) {
- this.isS3GRequest.set(s3Request);
- }
-
@Override
public S3Auth getThreadLocalS3Auth() {
return ozoneManagerClient.getThreadLocalS3Auth();
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
index 84b423a28ca..c96fe8bfc5b 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
@@ -220,40 +220,6 @@ public void testPutKeyWithECReplicationConfig() throws
IOException {
}
}
- /**
- * This test validates that for S3G,
- * the key upload process needs to be atomic.
- * It simulates two mismatch scenarios where the actual write data size does
- * not match the expected size.
- */
- @Test
- public void testPutKeySizeMismatch() throws IOException {
- String value = new String(new byte[1024], UTF_8);
- OzoneBucket bucket = getOzoneBucket();
- String keyName = UUID.randomUUID().toString();
- try {
- // Simulating first mismatch: Write less data than expected
- client.getProxy().setIsS3Request(true);
- OzoneOutputStream out1 = bucket.createKey(keyName,
- value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
- new HashMap<>());
- out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8));
- assertThrows(IllegalStateException.class, out1::close,
- "Expected IllegalArgumentException due to size mismatch.");
-
- // Simulating second mismatch: Write more data than expected
- OzoneOutputStream out2 = bucket.createKey(keyName,
- value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
- new HashMap<>());
- value += "1";
- out2.write(value.getBytes(UTF_8));
- assertThrows(IllegalStateException.class, out2::close,
- "Expected IllegalArgumentException due to size mismatch.");
- } finally {
- client.getProxy().setIsS3Request(false);
- }
- }
-
private OzoneBucket getOzoneBucket() throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestOzoneOutputStream.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestOzoneOutputStream.java
index d6a906582f4..a44d614417f 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestOzoneOutputStream.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestOzoneOutputStream.java
@@ -19,6 +19,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -26,8 +27,10 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.crypto.CryptoOutputStream;
+import org.apache.ratis.util.function.CheckedRunnable;
import org.junit.jupiter.api.Test;
/**
@@ -39,10 +42,10 @@ public class TestOzoneOutputStream {
* Fake KeyOutputStream implementation for testing.
* Uses the package-private KeyOutputStream() constructor.
*/
- private static class FakeKeyOutputStream extends KeyOutputStream
- implements KeyMetadataAware {
+ private static class FakeKeyOutputStream extends KeyOutputStream {
private final Map<String, String> metadata;
+ private List<CheckedRunnable<IOException>> preCommits;
FakeKeyOutputStream(Map<String, String> metadata) {
super(); // VisibleForTesting constructor
@@ -54,6 +57,15 @@ public Map<String, String> getMetadata() {
return metadata;
}
+ @Override
+ public void setPreCommits(List<CheckedRunnable<IOException>> preCommits) {
+ this.preCommits = preCommits;
+ }
+
+ List<CheckedRunnable<IOException>> getPreCommits() {
+ return preCommits;
+ }
+
@Override
public void flush() {
// avoid KeyOutputStream.flush() using null semaphore
@@ -133,6 +145,35 @@ public void testCipherWrapped() throws IOException {
}
}
+ @Test
+ public void testSetPreCommits() throws IOException {
+ FakeKeyOutputStream key =
+ new FakeKeyOutputStream(Collections.emptyMap());
+ List<CheckedRunnable<IOException>> preCommits =
+ Collections.singletonList(() -> { });
+
+ try (OzoneOutputStream ozone = new OzoneOutputStream(key, null)) {
+ ozone.setPreCommits(preCommits);
+ }
+
+ assertSame(preCommits, key.getPreCommits());
+ }
+
+ @Test
+ public void testSetPreCommitsRequiresKeyCommitOutput() throws IOException {
+ OutputStream stream = new OutputStream() {
+ @Override
+ public void write(int b) {
+
+ }
+ };
+
+ try (OzoneOutputStream ozone = new OzoneOutputStream(stream, null)) {
+ assertThrows(IllegalStateException.class,
+ () -> ozone.setPreCommits(Collections.emptyList()));
+ }
+ }
+
/**
* test for Non-KeyMetadataAware stream verify that exception is thrown here.
*/
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index e9c7f2c9fa8..224a823e14d 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -29,6 +29,7 @@
import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
import static
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_ARGUMENT;
+import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_TAG;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
import static org.apache.hadoop.ozone.s3.util.S3Consts.AWS_TAG_PREFIX;
@@ -107,6 +108,7 @@
import org.apache.hadoop.ozone.s3.util.S3Utils;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -198,7 +200,6 @@ public void initialization() {
ClientProtocol clientProtocol =
getClient().getObjectStore().getClientProxy();
clientProtocol.setThreadLocalS3Auth(s3Auth);
- clientProtocol.setIsS3Request(true);
bufferSize = (int) getOzoneConfiguration().getStorageSize(
OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
@@ -690,6 +691,19 @@ public static MessageDigest getSha256DigestInstance() {
return SHA_256_PROVIDER.get();
}
+ protected static CheckedRunnable<IOException> validateContentLength(
+ long expectedLength, long actualLength, String keyPath) {
+ return () -> {
+ if (actualLength != expectedLength) {
+ OS3Exception ex = newError(INVALID_REQUEST, keyPath);
+ ex.setErrorMessage(String.format(
+ "Request body length %d does not match expected length %d",
+ actualLength, expectedLength));
+ throw ex;
+ }
+ };
+ }
+
protected static String extractPartsCount(String eTag) {
if (eTag.contains("-")) {
String[] parts = eTag.replace("\"", "").split("-");
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 38cd6965a22..acf358e3ce8 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -54,8 +54,6 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -284,27 +282,27 @@ customMetadata, tags, multiDigestInputStream,
getHeaders(),
} else {
final String amzContentSha256Header =
validateSignatureHeader(getHeaders(), keyPath,
signatureInfo.isSignPayload());
- try (OzoneOutputStream output = openKeyForPut(
- volume.getName(), bucketName, keyPath, length,
- replicationConfig, customMetadata, tags, writeConditions)) {
+ final long expectedLength = length;
+ try (S3ObjectWriteGuard output =
+ new S3ObjectWriteGuard(openKeyForPut(
+ volume.getName(), bucketName, keyPath, expectedLength,
+ replicationConfig, customMetadata, tags, writeConditions),
+ expectedLength, keyPath)) {
long metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
- putLength = IOUtils.copyLarge(multiDigestInputStream, output, 0,
length,
- new byte[getIOBufferSize(length)]);
+ putLength = output.copyFrom(multiDigestInputStream,
getIOBufferSize(expectedLength));
md5Hash = DatatypeConverter.printHexBinary(
multiDigestInputStream.getMessageDigest(OzoneConsts.MD5_HASH).digest())
.toLowerCase();
output.getMetadata().put(OzoneConsts.ETAG, md5Hash);
- List<CheckedRunnable<IOException>> preCommits = new ArrayList<>();
-
String clientContentMD5 =
getHeaders().getHeaderString(S3Consts.CHECKSUM_HEADER);
if (clientContentMD5 != null) {
CheckedRunnable<IOException> checkContentMD5Hook = () -> {
S3Utils.validateContentMD5(clientContentMD5, md5Hash, keyPath);
};
- preCommits.add(checkContentMD5Hook);
+ output.addPreCommit(checkContentMD5Hook);
}
// If sha256Digest exists, this request must validate
x-amz-content-sha256
@@ -317,9 +315,8 @@ customMetadata, tags, multiDigestInputStream, getHeaders(),
throw
S3ErrorTable.newError(S3ErrorTable.X_AMZ_CONTENT_SHA256_MISMATCH, keyPath);
}
};
- preCommits.add(checkSha256Hook);
+ output.addPreCommit(checkSha256Hook);
}
- output.getKeyOutputStream().setPreCommits(preCommits);
}
}
getMetrics().incPutKeySuccessLength(putLength);
@@ -884,18 +881,19 @@ private Response createMultipartKey(OzoneVolume volume,
OzoneBucket ozoneBucket,
+ rangeHeader.getStartOffset() + " actual: " + skipped);
}
}
- try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
- .createMultipartKey(volume.getName(), bucketName, key, length,
- partNumber, uploadID)) {
+ final long expectedLength = length;
+ OzoneOutputStream ozoneOutputStream = getClientProtocol()
+ .createMultipartKey(volume.getName(), bucketName, key,
+ expectedLength, partNumber, uploadID);
+ try (S3ObjectWriteGuard writeGuard =
+ new S3ObjectWriteGuard(ozoneOutputStream, expectedLength, key)) {
metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
- copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream, 0,
length,
- new byte[getIOBufferSize(length)]);
- ozoneOutputStream.getMetadata()
- .putAll(sourceKeyDetails.getMetadata());
- String raw = ozoneOutputStream.getMetadata().get(OzoneConsts.ETAG);
+ copyLength = writeGuard.copyFrom(sourceObject,
getIOBufferSize(expectedLength));
+ writeGuard.getMetadata().putAll(sourceKeyDetails.getMetadata());
+ String raw = writeGuard.getMetadata().get(OzoneConsts.ETAG);
if (raw != null) {
- ozoneOutputStream.getMetadata().put(OzoneConsts.ETAG,
stripQuotes(raw));
+ writeGuard.getMetadata().put(OzoneConsts.ETAG, stripQuotes(raw));
}
outputStream = ozoneOutputStream;
}
@@ -904,13 +902,13 @@ private Response createMultipartKey(OzoneVolume volume,
OzoneBucket ozoneBucket,
}
} else {
long putLength;
- try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
- .createMultipartKey(volume.getName(), bucketName, key, length,
- partNumber, uploadID)) {
+ final long expectedLength = length;
+ OzoneOutputStream ozoneOutputStream = getClientProtocol()
+ .createMultipartKey(volume.getName(), bucketName, key,
expectedLength, partNumber, uploadID);
+ try (S3ObjectWriteGuard writeGuard = new
S3ObjectWriteGuard(ozoneOutputStream, expectedLength, key)) {
metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
- putLength = IOUtils.copyLarge(multiDigestInputStream,
ozoneOutputStream, 0, length,
- new byte[getIOBufferSize(length)]);
+ putLength = writeGuard.copyFrom(multiDigestInputStream,
getIOBufferSize(expectedLength));
byte[] digest =
multiDigestInputStream.getMessageDigest(OzoneConsts.MD5_HASH).digest();
String md5Hash =
DatatypeConverter.printHexBinary(digest).toLowerCase();
String clientContentMD5 =
getHeaders().getHeaderString(S3Consts.CHECKSUM_HEADER);
@@ -918,9 +916,9 @@ private Response createMultipartKey(OzoneVolume volume,
OzoneBucket ozoneBucket,
CheckedRunnable<IOException> checkContentMD5Hook = () -> {
S3Utils.validateContentMD5(clientContentMD5, md5Hash, key);
};
-
ozoneOutputStream.getKeyOutputStream().setPreCommits(Collections.singletonList(checkContentMD5Hook));
+ writeGuard.addPreCommit(checkContentMD5Hook);
}
- ozoneOutputStream.getMetadata().put(OzoneConsts.ETAG, md5Hash);
+ writeGuard.getMetadata().put(OzoneConsts.ETAG, md5Hash);
outputStream = ozoneOutputStream;
}
getMetrics().incPutKeySuccessLength(putLength);
@@ -991,13 +989,14 @@ srcKeyLen > getDatastreamMinLength()) {
getChunkSize(), replication, metadata, src, perf, startNanos,
tags,
writeConditions);
} else {
- try (OzoneOutputStream dest = openKeyForPut(
- volume.getName(), destBucket, destKey, srcKeyLen,
- replication, metadata, tags, writeConditions)) {
+ final long expectedLength = srcKeyLen;
+ try (S3ObjectWriteGuard dest = new S3ObjectWriteGuard(openKeyForPut(
+ volume.getName(), destBucket, destKey, expectedLength,
+ replication, metadata, tags, writeConditions), expectedLength,
destKey)) {
long metadataLatencyNs =
getMetrics().updateCopyKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
- copyLength = IOUtils.copyLarge(src, dest, 0, srcKeyLen, new
byte[getIOBufferSize(srcKeyLen)]);
+ copyLength = dest.copyFrom(src, getIOBufferSize(expectedLength));
String md5Hash =
DatatypeConverter.printHexBinary(src.getMessageDigest().digest()).toLowerCase();
dest.getMetadata().put(OzoneConsts.ETAG, md5Hash);
}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index fa1cde66049..55420a1668c 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -24,12 +24,8 @@
import static org.apache.hadoop.ozone.s3.util.S3Utils.wrapInQuotes;
import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
import java.security.DigestInputStream;
import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
@@ -38,7 +34,6 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.om.OmConfig;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -121,23 +116,23 @@ public static Pair<String, Long> putKeyWithStream(
final String amzContentSha256Header = validateSignatureHeader(headers,
keyPath, isSignedPayload);
long writeLen;
String md5Hash;
- try (OzoneDataStreamOutput streamOutput = openStreamKeyForPut(bucket,
- keyPath, length, replicationConfig, keyMetadata, tags,
- writeConditions)) {
+ try (S3ObjectStreamingWriteGuard writeGuard =
+ new S3ObjectStreamingWriteGuard(openStreamKeyForPut(bucket,
+ keyPath, length, replicationConfig, keyMetadata, tags,
+ writeConditions), length, keyPath)) {
long metadataLatencyNs = METRICS.updatePutKeyMetadataStats(startNanos);
- writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
+ writeLen = writeGuard.copyFrom(body, bufferSize);
md5Hash =
DatatypeConverter.printHexBinary(body.getMessageDigest(OzoneConsts.MD5_HASH).digest())
.toLowerCase();
perf.appendMetaLatencyNanos(metadataLatencyNs);
- ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG,
md5Hash);
+ writeGuard.getMetadata().put(OzoneConsts.ETAG, md5Hash);
- List<CheckedRunnable<IOException>> preCommits = new ArrayList<>();
String clientContentMD5 =
headers.getHeaderString(S3Consts.CHECKSUM_HEADER);
if (clientContentMD5 != null) {
CheckedRunnable<IOException> checkContentMD5Hook = () -> {
S3Utils.validateContentMD5(clientContentMD5, md5Hash, keyPath);
};
- preCommits.add(checkContentMD5Hook);
+ writeGuard.addPreCommit(checkContentMD5Hook);
}
// If sha256Digest exists, this request must validate
x-amz-content-sha256
@@ -150,10 +145,8 @@ public static Pair<String, Long> putKeyWithStream(
throw
S3ErrorTable.newError(S3ErrorTable.X_AMZ_CONTENT_SHA256_MISMATCH, keyPath);
}
};
- preCommits.add(checkSha256Hook);
+ writeGuard.addPreCommit(checkSha256Hook);
}
-
- streamOutput.setPreCommits(preCommits);
}
return Pair.of(md5Hash, writeLen);
}
@@ -189,38 +182,21 @@ public static long copyKeyWithStream(
S3ConditionalRequest.WriteConditions writeConditions)
throws IOException {
long writeLen;
- try (OzoneDataStreamOutput streamOutput = openStreamKeyForPut(bucket,
- keyPath, length, replicationConfig, keyMetadata, tags,
- writeConditions)) {
+ try (S3ObjectStreamingWriteGuard writeGuard =
+ new S3ObjectStreamingWriteGuard(openStreamKeyForPut(bucket,
+ keyPath, length, replicationConfig, keyMetadata, tags,
+ writeConditions), length, keyPath)) {
long metadataLatencyNs =
METRICS.updateCopyKeyMetadataStats(startNanos);
- writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
+ writeLen = writeGuard.copyFrom(body, bufferSize);
String eTag =
DatatypeConverter.printHexBinary(body.getMessageDigest().digest())
.toLowerCase();
perf.appendMetaLatencyNanos(metadataLatencyNs);
- ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG,
eTag);
+ writeGuard.getMetadata().put(OzoneConsts.ETAG, eTag);
}
return writeLen;
}
- private static long writeToStreamOutput(OzoneDataStreamOutput streamOutput,
- InputStream body, int bufferSize,
- long length)
- throws IOException {
- final byte[] buffer = new byte[bufferSize];
- long n = 0;
- while (n < length) {
- final int toRead = Math.toIntExact(Math.min(bufferSize, length - n));
- final int readLength = body.read(buffer, 0, toRead);
- if (readLength == -1) {
- break;
- }
- streamOutput.write(ByteBuffer.wrap(buffer, 0, readLength));
- n += readLength;
- }
- return n;
- }
-
@SuppressWarnings("checkstyle:ParameterNumber")
public static Response createMultipartKey(OzoneBucket ozoneBucket, String
key,
long length, int partNumber, String uploadID, int chunkSize,
@@ -229,23 +205,22 @@ public static Response createMultipartKey(OzoneBucket
ozoneBucket, String key,
long startNanos = Time.monotonicNowNanos();
String eTag;
try {
- try (OzoneDataStreamOutput streamOutput = ozoneBucket
- .createMultipartStreamKey(key, length, partNumber, uploadID)) {
+ try (S3ObjectStreamingWriteGuard writeGuard =
+ new S3ObjectStreamingWriteGuard(ozoneBucket
+ .createMultipartStreamKey(key, length, partNumber, uploadID),
+ length, key)) {
long metadataLatencyNs = METRICS.updatePutKeyMetadataStats(startNanos);
- long putLength =
- writeToStreamOutput(streamOutput, body, chunkSize, length);
+ long putLength = writeGuard.copyFrom(body, chunkSize);
eTag = DatatypeConverter.printHexBinary(
body.getMessageDigest(OzoneConsts.MD5_HASH).digest()).toLowerCase();
- List<CheckedRunnable<IOException>> preCommits = new ArrayList<>();
String clientContentMD5 =
headers.getHeaderString(S3Consts.CHECKSUM_HEADER);
if (clientContentMD5 != null) {
CheckedRunnable<IOException> checkContentMD5Hook = () -> {
S3Utils.validateContentMD5(clientContentMD5, eTag, key);
};
- preCommits.add(checkContentMD5Hook);
+ writeGuard.addPreCommit(checkContentMD5Hook);
}
- streamOutput.setPreCommits(preCommits);
- ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG,
eTag);
+ writeGuard.getMetadata().put(OzoneConsts.ETAG, eTag);
METRICS.incPutKeySuccessLength(putLength);
perf.appendMetaLatencyNanos(metadataLatencyNs);
perf.appendSizeBytes(putLength);
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectStreamingWriteGuard.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectStreamingWriteGuard.java
new file mode 100644
index 00000000000..31e862b9796
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectStreamingWriteGuard.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+
+/**
+ * Guards close-time commit for write request using datastream output.
+ */
+final class S3ObjectStreamingWriteGuard extends S3ObjectWriteGuard {
+
+ private final OzoneDataStreamOutput outputStream;
+
+ S3ObjectStreamingWriteGuard(
+ OzoneDataStreamOutput outputStream, long expectedLength, String keyPath)
{
+ super(outputStream, expectedLength, keyPath);
+ this.outputStream = outputStream;
+ outputStream.setPreCommits(getPreCommits());
+ }
+
+ @Override
+ protected void write(byte[] buffer, int offset, int length)
+ throws IOException {
+ outputStream.write(ByteBuffer.wrap(buffer, offset, length));
+ }
+
+ @Override
+ public Map<String, String> getMetadata() {
+ return ((KeyMetadataAware) outputStream).getMetadata();
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputStream.close();
+ }
+}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectWriteGuard.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectWriteGuard.java
new file mode 100644
index 00000000000..45dcfb15088
--- /dev/null
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/S3ObjectWriteGuard.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.s3.endpoint;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.ratis.util.function.CheckedRunnable;
+
+/**
+ * Tracks bytes written for a write request and guards close-time commit.
+ */
+class S3ObjectWriteGuard implements AutoCloseable {
+
+ private final OutputStream outputStream;
+ private final long expectedLength;
+ private final List<CheckedRunnable<IOException>> preCommits =
+ new ArrayList<>();
+ private long writtenLength;
+
+ S3ObjectWriteGuard(
+ OzoneOutputStream outputStream, long expectedLength, String keyPath) {
+ this.outputStream = outputStream;
+ this.expectedLength = expectedLength;
+ addContentLengthValidation(keyPath);
+ outputStream.setPreCommits(getPreCommits());
+ }
+
+ protected S3ObjectWriteGuard(
+ OutputStream outputStream, long expectedLength, String keyPath) {
+ this.outputStream = outputStream;
+ this.expectedLength = expectedLength;
+ addContentLengthValidation(keyPath);
+ }
+
+ private void addContentLengthValidation(String keyPath) {
+ preCommits.add(() -> EndpointBase.validateContentLength(
+ expectedLength, writtenLength, keyPath).run());
+ }
+
+ protected List<CheckedRunnable<IOException>> getPreCommits() {
+ return preCommits;
+ }
+
+ void addPreCommit(CheckedRunnable<IOException> preCommit) {
+ preCommits.add(preCommit);
+ }
+
+ long copyFrom(InputStream body, int bufferSize) throws IOException {
+ byte[] buffer = new byte[bufferSize];
+ while (writtenLength < expectedLength) {
+ int toRead = Math.toIntExact(
+ Math.min(bufferSize, expectedLength - writtenLength));
+ int readLength = body.read(buffer, 0, toRead);
+ if (readLength == -1) {
+ break;
+ }
+ write(buffer, 0, readLength);
+ writtenLength += readLength;
+ }
+ return writtenLength;
+ }
+
+ protected void write(byte[] buffer, int offset, int length)
+ throws IOException {
+ outputStream.write(buffer, offset, length);
+ }
+
+ public Map<String, String> getMetadata() {
+ return ((OzoneOutputStream) outputStream).getMetadata();
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputStream.close();
+ }
+}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index 35956bc1df8..0ef0a790388 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -688,11 +688,6 @@ public void setThreadLocalS3Auth(S3Auth s3Auth) {
}
- @Override
- public void setIsS3Request(boolean isS3Request) {
-
- }
-
@Override
public S3Auth getThreadLocalS3Auth() {
return null;
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 3b12b72540f..10ed365b2b0 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -145,7 +145,9 @@ public OzoneOutputStream createKey(String key, long size,
new KeyMetadataAwareOutputStream(metadata) {
@Override
public void close() throws IOException {
- keyContents.put(key, toByteArray());
+ byte[] bytes = toByteArray();
+ super.close();
+ keyContents.put(key, bytes);
keyDetails.put(key, new OzoneKeyDetails(
getVolumeName(),
getName(),
@@ -158,7 +160,6 @@ public void close() throws IOException {
UserGroupInformation.getCurrentUser().getShortUserName(),
tags
));
- super.close();
}
};
@@ -179,7 +180,9 @@ public OzoneOutputStream rewriteKey(String keyName, long
size, long existingKeyG
new KeyMetadataAwareOutputStream(metadata) {
@Override
public void close() throws IOException {
- keyContents.put(keyName, toByteArray());
+ byte[] bytes = toByteArray();
+ super.close();
+ keyContents.put(keyName, bytes);
keyDetails.put(keyName, new OzoneKeyDetails(
getVolumeName(),
getName(),
@@ -190,7 +193,6 @@ public void close() throws IOException {
new ArrayList<>(), finalReplicationCon, metadata, null,
() -> readKey(keyName), true, null, null
));
- super.close();
}
};
@@ -531,8 +533,10 @@ public OzoneOutputStream createMultipartKey(String key,
long size,
new KeyMetadataAwareOutputStream((int) size, new HashMap<>()) {
@Override
public void close() throws IOException {
- Part part = new Part(key + size,
- toByteArray(), getMetadata().get(ETAG));
+ byte[] bytes = toByteArray();
+ String eTag = getMetadata().get(ETAG);
+ super.close();
+ Part part = new Part(key + size, bytes, eTag);
if (partList.get(key) == null) {
Map<Integer, Part> parts = new TreeMap<>();
parts.put(partNumber, part);
@@ -540,7 +544,6 @@ public void close() throws IOException {
} else {
partList.get(key).put(partNumber, part);
}
- super.close();
}
};
return new OzoneOutputStreamStub(keyOutputStream, key + size);
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
index 11ba8daf804..8827150069f 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/EndpointTestUtils.java
@@ -25,6 +25,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.List;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.HttpHeaders;
@@ -123,13 +124,28 @@ public static Response put(
int partNumber,
String uploadID,
String content
+ ) throws IOException, OS3Exception {
+ return put(subject, bucket, key, partNumber, uploadID,
+ contentLength(content), content);
+ }
+
+ /** Put with content, part number, upload ID, and explicit Content-Length. */
+ public static Response put(
+ ObjectEndpoint subject,
+ String bucket,
+ String key,
+ int partNumber,
+ String uploadID,
+ long contentLength,
+ String content
) throws IOException, OS3Exception {
if (uploadID != null) {
subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID,
uploadID);
}
subject.queryParamsForTest().setInt(S3Consts.QueryParams.PART_NUMBER,
partNumber);
when(subject.getContext().getMethod()).thenReturn(HttpMethod.PUT);
- setLengthHeader(subject, content);
+ when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH))
+ .thenReturn(String.valueOf(contentLength));
if (content == null) {
return subject.put(bucket, key, null);
@@ -264,9 +280,44 @@ public static OS3Exception
assertErrorResponse(S3ErrorTable expected, CheckedSup
}
private static void setLengthHeader(ObjectEndpoint subject, String content) {
- final long length = content != null ? content.length() : 0;
when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH))
- .thenReturn(String.valueOf(length));
+ .thenReturn(String.valueOf(contentLength(content)));
+ }
+
+ private static long contentLength(String content) {
+ return content != null ? content.getBytes(UTF_8).length : 0;
+ }
+
+ static final class FailingInputStream extends InputStream {
+
+ private final byte[] content;
+ private final int failAfterBytes;
+ private int position;
+
+ FailingInputStream(byte[] content, int failAfterBytes) {
+ this.content = content;
+ this.failAfterBytes = failAfterBytes;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ if (position >= failAfterBytes) {
+ throw new IOException("upload interrupted");
+ }
+
+ int bytesToRead = Math.min(length, failAfterBytes - position);
+ System.arraycopy(content, position, buffer, offset, bytesToRead);
+ position += bytesToRead;
+ return bytesToRead;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (position >= failAfterBytes) {
+ throw new IOException("upload interrupted");
+ }
+ return content[position++];
+ }
}
private EndpointTestUtils() {
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
index bcd4cdd9084..c9313207c01 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.s3.endpoint;
import static
org.apache.hadoop.ozone.client.OzoneClientTestUtils.assertKeyContent;
+import static
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.FailingInputStream;
import static
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertErrorResponse;
import static
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertSucceeds;
import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.put;
@@ -44,8 +45,10 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
@@ -55,19 +58,17 @@
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Map;
import java.util.stream.Stream;
+import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -264,15 +265,18 @@ void testPutObjectWithSignedChunks() throws Exception {
@Test
public void testPutObjectMessageDigestResetDuringException() {
MessageDigest messageDigest = mock(MessageDigest.class);
- try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class);
- MockedStatic<EndpointBase> endpoint = mockStatic(EndpointBase.class)) {
+ try (MockedStatic<EndpointBase> endpoint =
+ mockStatic(EndpointBase.class, CALLS_REAL_METHODS)) {
// For example, EOFException during put-object due to client cancelling
the operation before it completes
- mocked.when(() -> IOUtils.copyLarge(any(InputStream.class),
any(OutputStream.class), anyLong(),
- anyLong(), any(byte[].class)))
- .thenThrow(IOException.class);
+ when(messageDigest.getAlgorithm()).thenReturn(OzoneConsts.MD5_HASH);
endpoint.when(EndpointBase::getMD5DigestInstance).thenReturn(messageDigest);
+ setPutRequestLength(CONTENT.length());
- assertThrows(IOException.class, () -> putObject(CONTENT).close());
+ IOException ex = assertThrows(IOException.class,
+ () -> objectEndpoint.put(BUCKET_NAME, KEY_NAME,
+ new FailingInputStream(CONTENT.getBytes(StandardCharsets.UTF_8),
5)).close());
+ assertEquals("upload interrupted", ex.getMessage());
+ assertThrows(IOException.class, () -> bucket.getKey(KEY_NAME));
// Verify that the message digest is reset so that the instance can be
reused for the
// next request in the same thread
@@ -386,20 +390,21 @@ public void
testCopyObjectMessageDigestResetDuringException() throws Exception {
assertThat(keyDetails.getMetadata().get(OzoneConsts.ETAG)).isNotEmpty();
MessageDigest messageDigest = mock(MessageDigest.class);
- try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class);
- MockedStatic<EndpointBase> endpoint = mockStatic(EndpointBase.class)) {
+ try (MockedStatic<EndpointBase> endpoint =
+ mockStatic(EndpointBase.class, CALLS_REAL_METHODS)) {
// Add the mocked methods only during the copy request
endpoint.when(EndpointBase::getMD5DigestInstance).thenReturn(messageDigest);
- endpoint.when(() ->
EndpointBase.parseSourceHeader(any())).thenCallRealMethod();
- mocked.when(() -> IOUtils.copyLarge(any(InputStream.class),
any(OutputStream.class), anyLong(),
- anyLong(), any(byte[].class)))
- .thenThrow(IOException.class);
+ doThrow(new RuntimeException("digest interrupted"))
+ .when(messageDigest).update(any(byte[].class), anyInt(), anyInt());
// Add copy header, and then call put
when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
BUCKET_NAME + "/" + urlEncode(KEY_NAME));
- assertThrows(IOException.class, () -> putObject(DEST_BUCKET_NAME,
DEST_KEY).close());
+ RuntimeException ex = assertThrows(RuntimeException.class,
+ () -> putObject(DEST_BUCKET_NAME, DEST_KEY).close());
+ assertEquals("digest interrupted", ex.getMessage());
+ assertThrows(IOException.class, () -> destBucket.getKey(DEST_KEY));
// Verify that the message digest is reset so that the instance can be
reused for the
// next request in the same thread
verify(messageDigest, times(1)).reset();
@@ -641,6 +646,14 @@ public void testPutEmptyObject() throws Exception {
assertEquals(0, bucket.getKey(KEY_NAME).getDataSize());
}
+ @Test
+ public void testPutObjectRejectsIncompleteBody() {
+ assertErrorResponse(INVALID_REQUEST,
+ () -> put(objectEndpoint, BUCKET_NAME, KEY_NAME, 0,
+ null, CONTENT.length() + 1, CONTENT));
+ assertThrows(IOException.class, () -> bucket.getKey(KEY_NAME));
+ }
+
@Test
public void testPutObjectWithContentMD5() throws Exception {
// GIVEN
@@ -804,4 +817,10 @@ private Response putObject(String bucketName, String
keyName) throws IOException
private Response putObject(String content) throws IOException, OS3Exception {
return put(objectEndpoint, BUCKET_NAME, KEY_NAME, content);
}
+
+ private void setPutRequestLength(long length) {
+ when(objectEndpoint.getContext().getMethod()).thenReturn(HttpMethod.PUT);
+ when(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH))
+ .thenReturn(String.valueOf(length));
+ }
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
index 8f4cf0069d7..49adfa06511 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.s3.endpoint;
+import static
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.FailingInputStream;
import static
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertErrorResponse;
import static
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertSucceeds;
import static
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.initiateMultipartUpload;
@@ -27,10 +28,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
@@ -40,16 +39,15 @@
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.UUID;
import java.util.stream.Stream;
+import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -58,6 +56,7 @@
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+import org.apache.hadoop.ozone.s3.util.S3Consts;
import org.apache.hadoop.ozone.s3.util.S3StorageType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -142,33 +141,27 @@ public void testPartUploadWithStandardIA() throws
Exception {
assertContentLength(uploadID, keyName, content.length());
}
- @Test
- public void testPartUploadWithStandardIAAndContentMD5() throws Exception {
- when(headers.getHeaderString(STORAGE_CLASS_HEADER))
- .thenReturn(S3StorageType.STANDARD_IA.name(), (String)null);
- String content = "Multipart Upload Part";
- byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
- byte[] md5Bytes = MessageDigest.getInstance("MD5").digest(contentBytes);
- String md5Base64 = Base64.getEncoder().encodeToString(md5Bytes);
- when(headers.getHeaderString("Content-MD5")).thenReturn(md5Base64);
-
- String keyName = UUID.randomUUID().toString();
- String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET,
keyName);
-
- try (Response response = put(rest, OzoneConsts.S3_BUCKET, keyName, 1,
- uploadID, content)) {
- assertNotNull(response.getHeaderString(OzoneConsts.ETAG));
- assertEquals(200, response.getStatus());
- }
- assertContentLength(uploadID, keyName, content.length());
- }
-
@Test
public void testPartUploadWithIncorrectUploadID() {
assertErrorResponse(S3ErrorTable.NO_SUCH_UPLOAD,
() -> put(rest, OzoneConsts.S3_BUCKET, OzoneConsts.KEY, 1, "random",
"any"));
}
+ @Test
+ public void testPartUploadRejectsIncompleteBody() throws Exception {
+ String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET,
+ OzoneConsts.KEY);
+ String content = "Multipart Upload";
+
+ assertErrorResponse(S3ErrorTable.INVALID_REQUEST,
+ () -> put(rest, OzoneConsts.S3_BUCKET, OzoneConsts.KEY,
+ 1, uploadID, content.length() + 1, content));
+ OzoneMultipartUploadPartListParts parts =
+ client.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET)
+ .listParts(OzoneConsts.KEY, uploadID, 0, 100);
+ assertEquals(0, parts.getPartInfoList().size());
+ }
+
@Test
public void testPartUploadStreamContentLength()
throws IOException, OS3Exception {
@@ -195,34 +188,28 @@ public void
testPartUploadMessageDigestResetDuringException() throws IOException
String uploadID = initiateMultipartUpload(rest, OzoneConsts.S3_BUCKET,
OzoneConsts.KEY);
MessageDigest messageDigest = mock(MessageDigest.class);
- when(messageDigest.getAlgorithm()).thenReturn("MD5");
+ when(messageDigest.getAlgorithm()).thenReturn(OzoneConsts.MD5_HASH);
MessageDigest sha256Digest = mock(MessageDigest.class);
- when(sha256Digest.getAlgorithm()).thenReturn("SHA-256");
- try (MockedStatic<IOUtils> ioutils = mockStatic(IOUtils.class);
- MockedStatic<ObjectEndpointStreaming> streaming =
mockStatic(ObjectEndpointStreaming.class);
- MockedStatic<EndpointBase> endpoint = mockStatic(EndpointBase.class)) {
+ when(sha256Digest.getAlgorithm()).thenReturn(OzoneConsts.FILE_HASH);
+ try (MockedStatic<EndpointBase> endpoint =
+ mockStatic(EndpointBase.class, CALLS_REAL_METHODS)) {
// Add the mocked methods only during part upload
endpoint.when(EndpointBase::getMD5DigestInstance).thenReturn(messageDigest);
endpoint.when(EndpointBase::getSha256DigestInstance).thenReturn(sha256Digest);
- if (enableDataStream) {
- streaming.when(() -> ObjectEndpointStreaming.createMultipartKey(any(),
any(), anyLong(), anyInt(), any(),
- anyInt(), any(), any(), any()))
- .thenThrow(IOException.class);
- } else {
- ioutils.when(() -> IOUtils.copyLarge(any(InputStream.class),
any(OutputStream.class), anyLong(),
- anyLong(), any(byte[].class)))
- .thenThrow(IOException.class);
- }
String content = "Multipart Upload";
- try (Response ignored = put(rest, OzoneConsts.S3_BUCKET,
OzoneConsts.KEY, 1, uploadID, content)) {
- fail("Should throw IOException");
- } catch (IOException ignored) {
- // Verify that the message digest is reset so that the instance can be
reused for the
- // next request in the same thread
- verify(messageDigest, times(1)).reset();
- verify(sha256Digest, times(1)).reset();
+ try (InputStream body = new FailingInputStream(
+ content.getBytes(StandardCharsets.UTF_8), 5)) {
+ IOException ex = assertThrows(IOException.class,
+ () -> putPart(rest, OzoneConsts.S3_BUCKET, OzoneConsts.KEY, 1,
+ uploadID, content.length(), body).close());
+ assertEquals("upload interrupted", ex.getMessage());
}
+
+ // Verify that the message digest is reset so that the instance can be
reused for the
+ // next request in the same thread
+ verify(messageDigest, times(1)).reset();
+ verify(sha256Digest, times(1)).reset();
}
}
@@ -293,4 +280,16 @@ private void assertContentLength(String uploadID, String
key,
assertEquals(contentLength,
parts.getPartInfoList().get(0).getSize());
}
+
+ private static Response putPart(ObjectEndpoint subject, String bucket,
+ String key, int partNumber, String uploadID, long contentLength,
+ InputStream body) throws IOException, OS3Exception {
+ subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID, uploadID);
+ subject.queryParamsForTest().setInt(S3Consts.QueryParams.PART_NUMBER,
+ partNumber);
+ when(subject.getContext().getMethod()).thenReturn(HttpMethod.PUT);
+ when(subject.getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH))
+ .thenReturn(String.valueOf(contentLength));
+ return subject.put(bucket, key, body);
+ }
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
index 66d4a4cbef6..ddbfeb2b866 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java
@@ -19,17 +19,22 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
+import static
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.FailingInputStream;
import static
org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertSucceeds;
import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.put;
import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.X_AMZ_CONTENT_SHA256;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.io.OutputStream;
+import java.security.MessageDigest;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.ws.rs.core.HttpHeaders;
@@ -39,9 +44,12 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.apache.hadoop.ozone.s3.MultiDigestInputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -92,6 +100,28 @@ public void testUpload() throws Exception {
assertSucceeds(() -> put(rest, S3BUCKET, S3KEY,
S3_COPY_EXISTING_KEY_CONTENT));
}
+ @Test
+ public void testUploadDoesNotCommitWhenBodyReadFails() throws Exception {
+ OzoneBucket bucket = client.getObjectStore().getS3Bucket(S3BUCKET);
+ byte[] keyContent = S3_COPY_EXISTING_KEY_CONTENT.getBytes(UTF_8);
+ try (FailingInputStream failing = new FailingInputStream(keyContent, 5);
+ MultiDigestInputStream body = new MultiDigestInputStream(failing,
+ Collections.singletonList(
+ MessageDigest.getInstance(OzoneConsts.MD5_HASH)))) {
+
+ IOException ex = assertThrows(IOException.class,
+ () -> ObjectEndpointStreaming.put(bucket, S3KEY, keyContent.length,
+ ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
+ ReplicationFactor.THREE), rest.getChunkSize(),
+ new HashMap<>(), new HashMap<>(), body, rest.getHeaders(), true,
+ new AuditLogger.PerformanceStringBuilder(),
+ S3ConditionalRequest.parseWriteConditions(rest.getHeaders(),
+ S3KEY)));
+ assertEquals("upload interrupted", ex.getMessage());
+ assertThrows(IOException.class, () -> bucket.getKey(S3KEY));
+ }
+ }
+
@Test
public void testUploadWithCopy() throws Exception {
OzoneBucket bucket =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]