This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e14fb8e092 HDDS-9526. Two S3G instances writing the same key may cause
data loss in case of an exception. (#5524)
e14fb8e092 is described below
commit e14fb8e092cbbcd562877864aff1bc66af9fe59b
Author: XiChen <[email protected]>
AuthorDate: Thu Nov 9 22:09:48 2023 +0800
HDDS-9526. Two S3G instances writing the same key may cause data loss in
case of an exception. (#5524)
---
.../client/io/BlockDataStreamOutputEntryPool.java | 4 +
.../client/io/BlockOutputStreamEntryPool.java | 3 +
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 15 ++
.../ozone/client/io/KeyDataStreamOutput.java | 27 +++-
.../hadoop/ozone/client/io/KeyOutputStream.java | 31 +++-
.../ozone/client/io/OzoneDataStreamOutput.java | 21 ++-
.../hadoop/ozone/client/io/OzoneOutputStream.java | 23 ++-
.../ozone/client/protocol/ClientProtocol.java | 3 +
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 27 +++-
.../hadoop/ozone/client/TestOzoneClient.java | 34 +++++
.../hadoop/ozone/s3/endpoint/EndpointBase.java | 7 +-
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 166 ++++++++++++---------
.../ozone/s3/endpoint/ObjectEndpointStreaming.java | 30 ++--
.../hadoop/ozone/client/ClientProtocolStub.java | 5 +
.../hadoop/ozone/client/OzoneBucketStub.java | 2 +-
.../hadoop/ozone/client/OzoneOutputStreamStub.java | 15 ++
.../s3/endpoint/TestMultipartUploadWithCopy.java | 23 +++
.../hadoop/ozone/s3/endpoint/TestObjectPut.java | 44 ++++++
.../hadoop/ozone/s3/endpoint/TestPartUpload.java | 73 ++++++++-
19 files changed, 442 insertions(+), 111 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 10b16f800d..d4bccd5535 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -289,6 +289,10 @@ public class BlockDataStreamOutputEntryPool implements
KeyMetadataAware {
return totalDataLen;
}
+ public long getDataSize() {
+ return keyArgs.getDataSize();
+ }
+
@Override
public Map<String, String> getMetadata() {
return this.keyArgs.getMetadata();
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 7b0259e379..65c1cd4caa 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -438,4 +438,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
return null;
}
+ long getDataSize() {
+ return keyArgs.getDataSize();
+ }
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 242b2606f8..15ebccda28 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -78,6 +78,14 @@ public final class ECKeyOutputStream extends KeyOutputStream
private final Future<Boolean> flushFuture;
private final AtomicLong flushCheckpoint;
+ /**
+ * Indicates if an atomic write is required. When set to true,
+ * the amount of data written must match the declared size during the commit.
+ * A mismatch will prevent the commit from succeeding.
+ * This is essential for operations like S3 put to ensure atomicity.
+ */
+ private boolean atomicKeyCreation;
+
private enum StripeWriteStatus {
SUCCESS,
FAILED
@@ -155,6 +163,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth));
this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue);
this.flushCheckpoint = new AtomicLong(0);
+ this.atomicKeyCreation = builder.getAtomicKeyCreation();
}
/**
@@ -512,6 +521,12 @@ public final class ECKeyOutputStream extends
KeyOutputStream
Preconditions.checkArgument(writeOffset == offset,
"Expected writeOffset= " + writeOffset
+ " Expected offset=" + offset);
+ if (atomicKeyCreation) {
+ long expectedSize = blockOutputStreamEntryPool.getDataSize();
+ Preconditions.checkState(expectedSize == offset, String.format(
+ "Expected: %d and actual %d write sizes do not match",
+ expectedSize, offset));
+ }
blockOutputStreamEntryPool.commitKey(offset);
}
} catch (ExecutionException e) {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index a6331151e3..2368cd78e9 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -81,6 +81,14 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput
private long clientID;
+ /**
+ * Indicates if an atomic write is required. When set to true,
+ * the amount of data written must match the declared size during the commit.
+ * A mismatch will prevent the commit from succeeding.
+ * This is essential for operations like S3 put to ensure atomicity.
+ */
+ private boolean atomicKeyCreation;
+
@VisibleForTesting
public List<BlockDataStreamOutputEntry> getStreamEntries() {
return blockDataStreamOutputEntryPool.getStreamEntries();
@@ -109,7 +117,8 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber, boolean isMultipart,
- boolean unsafeByteBufferConversion
+ boolean unsafeByteBufferConversion,
+ boolean atomicKeyCreation
) {
super(HddsClientUtils.getRetryPolicyByException(
config.getMaxRetryCount(), config.getRetryInterval()));
@@ -130,6 +139,7 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput
// encrypted bucket.
this.writeOffset = 0;
this.clientID = handler.getId();
+ this.atomicKeyCreation = atomicKeyCreation;
}
/**
@@ -387,6 +397,12 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput
if (!isException()) {
Preconditions.checkArgument(writeOffset == offset);
}
+ if (atomicKeyCreation) {
+ long expectedSize = blockDataStreamOutputEntryPool.getDataSize();
+ Preconditions.checkArgument(expectedSize == offset,
+ String.format("Expected: %d and actual %d write sizes do not
match",
+ expectedSize, offset));
+ }
blockDataStreamOutputEntryPool.commitKey(offset);
} finally {
blockDataStreamOutputEntryPool.cleanup();
@@ -422,6 +438,7 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput
private boolean unsafeByteBufferConversion;
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
+ private boolean atomicKeyCreation = false;
public Builder setMultipartUploadID(String uploadID) {
this.multipartUploadID = uploadID;
@@ -474,6 +491,11 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput
return this;
}
+ public Builder setAtomicKeyCreation(boolean atomicKey) {
+ this.atomicKeyCreation = atomicKey;
+ return this;
+ }
+
public KeyDataStreamOutput build() {
return new KeyDataStreamOutput(
clientConfig,
@@ -486,7 +508,8 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput
multipartUploadID,
multipartNumber,
isMultipartKey,
- unsafeByteBufferConversion);
+ unsafeByteBufferConversion,
+ atomicKeyCreation);
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index fa23f88544..4e0c4c91fa 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -95,6 +95,14 @@ public class KeyOutputStream extends OutputStream
private long clientID;
+ /**
+ * Indicates if an atomic write is required. When set to true,
+ * the amount of data written must match the declared size during the commit.
+ * A mismatch will prevent the commit from succeeding.
+ * This is essential for operations like S3 put to ensure atomicity.
+ */
+ private boolean atomicKeyCreation;
+
public KeyOutputStream(ReplicationConfig replicationConfig,
ContainerClientMetrics clientMetrics) {
this.replication = replicationConfig;
@@ -142,7 +150,8 @@ public class KeyOutputStream extends OutputStream
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber, boolean isMultipart,
boolean unsafeByteBufferConversion,
- ContainerClientMetrics clientMetrics
+ ContainerClientMetrics clientMetrics,
+ boolean atomicKeyCreation
) {
this.config = config;
this.replication = replicationConfig;
@@ -163,6 +172,7 @@ public class KeyOutputStream extends OutputStream
this.isException = false;
this.writeOffset = 0;
this.clientID = handler.getId();
+ this.atomicKeyCreation = atomicKeyCreation;
}
/**
@@ -555,6 +565,12 @@ public class KeyOutputStream extends OutputStream
if (!isException) {
Preconditions.checkArgument(writeOffset == offset);
}
+ if (atomicKeyCreation) {
+ long expectedSize = blockOutputStreamEntryPool.getDataSize();
+ Preconditions.checkState(expectedSize == offset,
+ String.format("Expected: %d and actual %d write sizes do not
match",
+ expectedSize, offset));
+ }
blockOutputStreamEntryPool.commitKey(offset);
} finally {
blockOutputStreamEntryPool.cleanup();
@@ -591,6 +607,7 @@ public class KeyOutputStream extends OutputStream
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
private ContainerClientMetrics clientMetrics;
+ private boolean atomicKeyCreation = false;
public String getMultipartUploadID() {
return multipartUploadID;
@@ -677,6 +694,11 @@ public class KeyOutputStream extends OutputStream
return this;
}
+ public Builder setAtomicKeyCreation(boolean atomicKey) {
+ this.atomicKeyCreation = atomicKey;
+ return this;
+ }
+
public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
this.clientMetrics = clientMetrics;
return this;
@@ -686,6 +708,10 @@ public class KeyOutputStream extends OutputStream
return clientMetrics;
}
+ public boolean getAtomicKeyCreation() {
+ return atomicKeyCreation;
+ }
+
public KeyOutputStream build() {
return new KeyOutputStream(
clientConfig,
@@ -698,7 +724,8 @@ public class KeyOutputStream extends OutputStream
multipartNumber,
isMultipartKey,
unsafeByteBufferConversion,
- clientMetrics);
+ clientMetrics,
+ atomicKeyCreation);
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index d8cb06eccc..c0af1c5301 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -59,30 +59,35 @@ public class OzoneDataStreamOutput extends
ByteBufferOutputStream
}
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+ KeyDataStreamOutput keyDataStreamOutput = getKeyDataStreamOutput();
+ if (keyDataStreamOutput != null) {
+ return keyDataStreamOutput.getCommitUploadPartInfo();
+ }
+ // Otherwise return null.
+ return null;
+ }
+
+ public KeyDataStreamOutput getKeyDataStreamOutput() {
if (byteBufferStreamOutput instanceof OzoneOutputStream) {
OutputStream outputStream =
((OzoneOutputStream) byteBufferStreamOutput).getOutputStream();
if (outputStream instanceof KeyDataStreamOutput) {
- return ((KeyDataStreamOutput)
- outputStream).getCommitUploadPartInfo();
+ return ((KeyDataStreamOutput) outputStream);
} else if (outputStream instanceof CryptoOutputStream) {
OutputStream wrappedStream =
((CryptoOutputStream) outputStream).getWrappedStream();
if (wrappedStream instanceof KeyDataStreamOutput) {
- return ((KeyDataStreamOutput) wrappedStream)
- .getCommitUploadPartInfo();
+ return ((KeyDataStreamOutput) wrappedStream);
}
} else if (outputStream instanceof CipherOutputStreamOzone) {
OutputStream wrappedStream =
((CipherOutputStreamOzone) outputStream).getWrappedStream();
if (wrappedStream instanceof KeyDataStreamOutput) {
- return ((KeyDataStreamOutput) wrappedStream)
- .getCommitUploadPartInfo();
+ return ((KeyDataStreamOutput) wrappedStream);
}
}
} else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
- return ((KeyDataStreamOutput)
- byteBufferStreamOutput).getCommitUploadPartInfo();
+ return ((KeyDataStreamOutput) byteBufferStreamOutput);
}
// Otherwise return null.
return null;
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
index 093b31b7a5..bd056185e7 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
@@ -123,29 +123,38 @@ public class OzoneOutputStream extends
ByteArrayStreamOutput
}
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+ KeyOutputStream keyOutputStream = getKeyOutputStream();
+ if (keyOutputStream != null) {
+ return keyOutputStream.getCommitUploadPartInfo();
+ }
+ // Otherwise return null.
+ return null;
+ }
+
+ public OutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ public KeyOutputStream getKeyOutputStream() {
if (outputStream instanceof KeyOutputStream) {
- return ((KeyOutputStream) outputStream).getCommitUploadPartInfo();
+ return ((KeyOutputStream) outputStream);
} else if (outputStream instanceof CryptoOutputStream) {
OutputStream wrappedStream =
((CryptoOutputStream) outputStream).getWrappedStream();
if (wrappedStream instanceof KeyOutputStream) {
- return ((KeyOutputStream) wrappedStream).getCommitUploadPartInfo();
+ return ((KeyOutputStream) wrappedStream);
}
} else if (outputStream instanceof CipherOutputStreamOzone) {
OutputStream wrappedStream =
((CipherOutputStreamOzone) outputStream).getWrappedStream();
if (wrappedStream instanceof KeyOutputStream) {
- return ((KeyOutputStream)wrappedStream).getCommitUploadPartInfo();
+ return ((KeyOutputStream)wrappedStream);
}
}
// Otherwise return null.
return null;
}
- public OutputStream getOutputStream() {
- return outputStream;
- }
-
@Override
public Map<String, String> getMetadata() {
if (outputStream instanceof CryptoOutputStream) {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index b45a3209f4..bc01d6653b 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -1008,6 +1008,9 @@ public interface ClientProtocol {
*/
void setThreadLocalS3Auth(S3Auth s3Auth);
+
+ void setIsS3Request(boolean isS3Request);
+
/**
* Gets the S3 Authentication information that is attached to the thread.
* @return S3 Authentication information.
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index ad8ced95d1..5d70bdfb86 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -160,6 +160,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
@@ -212,6 +213,7 @@ public class RpcClient implements ClientProtocol {
private final OzoneManagerVersion omVersion;
private volatile ExecutorService ecReconstructExecutor;
private final ContainerClientMetrics clientMetrics;
+ private final AtomicBoolean isS3GRequest = new AtomicBoolean(false);
/**
* Creates RpcClient instance with the given configuration.
@@ -687,7 +689,7 @@ public class RpcClient implements ClientProtocol {
: "with server-side default bucket layout";
LOG.info("Creating Bucket: {}/{}, {}, {} as owner, Versioning {}, " +
"Storage Type set to {} and Encryption set to {}, " +
- "Replication Type set to {}, Namespace Quota set to {}, " +
+ "Replication Type set to {}, Namespace Quota set to {}, " +
"Space Quota set to {} ",
volumeName, bucketName, layoutMsg, owner, isVersionEnabled,
storageType, bek != null, replicationType,
@@ -1346,6 +1348,13 @@ public class RpcClient implements ClientProtocol {
.setLatestVersionLocation(getLatestVersionLocation);
OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
+ // For bucket with layout OBJECT_STORE, when create an empty file (size=0),
+ // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE,
+ // which will cause S3G's atomic write length check to fail,
+ // so reset size to 0 here.
+ if (isS3GRequest.get() && size == 0) {
+ openKey.getKeyInfo().setDataSize(size);
+ }
return createOutputStream(openKey);
}
@@ -1787,6 +1796,7 @@ public class RpcClient implements ClientProtocol {
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
+ .setAtomicKeyCreation(isS3GRequest.get())
.build();
return createOutputStream(openKey, keyOutputStream);
}
@@ -1802,7 +1812,9 @@ public class RpcClient implements ClientProtocol {
throws IOException {
final OpenKeySession openKey = newMultipartOpenKey(
volumeName, bucketName, keyName, size, partNumber, uploadID);
-
+ // Amazon S3 never adds partial objects, So for S3 requests we need to
+ // set atomicKeyCreation to true
+ // refer:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
KeyDataStreamOutput keyOutputStream =
new KeyDataStreamOutput.Builder()
.setHandler(openKey)
@@ -1814,6 +1826,7 @@ public class RpcClient implements ClientProtocol {
.setIsMultipartKey(true)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(conf.getObject(OzoneClientConfig.class))
+ .setAtomicKeyCreation(isS3GRequest.get())
.build();
keyOutputStream
.addPreallocateBlocks(
@@ -2216,6 +2229,9 @@ public class RpcClient implements ClientProtocol {
throws IOException {
final ReplicationConfig replicationConfig
= openKey.getKeyInfo().getReplicationConfig();
+ // Amazon S3 never adds partial objects, So for S3 requests we need to
+ // set atomicKeyCreation to true
+ // refer:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
KeyDataStreamOutput keyOutputStream =
new KeyDataStreamOutput.Builder()
.setHandler(openKey)
@@ -2224,6 +2240,7 @@ public class RpcClient implements ClientProtocol {
.setReplicationConfig(replicationConfig)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(conf.getObject(OzoneClientConfig.class))
+ .setAtomicKeyCreation(isS3GRequest.get())
.build();
keyOutputStream
.addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
@@ -2305,6 +2322,7 @@ public class RpcClient implements ClientProtocol {
.setOmClient(ozoneManagerClient)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(conf.getObject(OzoneClientConfig.class))
+ .setAtomicKeyCreation(isS3GRequest.get())
.setClientMetrics(clientMetrics);
}
@@ -2383,6 +2401,11 @@ public class RpcClient implements ClientProtocol {
ozoneManagerClient.setThreadLocalS3Auth(ozoneSharedSecretAuth);
}
+ @Override
+ public void setIsS3Request(boolean s3Request) {
+ this.isS3GRequest.set(s3Request);
+ }
+
@Override
public S3Auth getThreadLocalS3Auth() {
return ozoneManagerClient.getThreadLocalS3Auth();
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
index 542b8a8a9e..c7a09cd2ce 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
@@ -224,6 +224,40 @@ public class TestOzoneClient {
}
}
+ /**
+ * This test validates that for S3G,
+ * the key upload process needs to be atomic.
+ * It simulates two mismatch scenarios where the actual write data size does
+ * not match the expected size.
+ */
+ @Test
+ public void testPutKeySizeMismatch() throws IOException {
+ String value = new String(new byte[1024], UTF_8);
+ OzoneBucket bucket = getOzoneBucket();
+ String keyName = UUID.randomUUID().toString();
+ try {
+ // Simulating first mismatch: Write less data than expected
+ client.getProxy().setIsS3Request(true);
+ OzoneOutputStream out1 = bucket.createKey(keyName,
+ value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
+ new HashMap<>());
+ out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8));
+ Assertions.assertThrows(IllegalStateException.class, out1::close,
+ "Expected IllegalArgumentException due to size mismatch.");
+
+ // Simulating second mismatch: Write more data than expected
+ OzoneOutputStream out2 = bucket.createKey(keyName,
+ value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
+ new HashMap<>());
+ value += "1";
+ out2.write(value.getBytes(UTF_8));
+ Assertions.assertThrows(IllegalStateException.class, out2::close,
+ "Expected IllegalArgumentException due to size mismatch.");
+ } finally {
+ client.getProxy().setIsS3Request(false);
+ }
+ }
+
private OzoneBucket getOzoneBucket() throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index 9473467b8b..05b7a62c06 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -127,9 +127,10 @@ public abstract class EndpointBase implements Auditor {
signatureInfo.getSignature(),
signatureInfo.getAwsAccessId(), signatureInfo.getAwsAccessId());
LOG.debug("S3 access id: {}", s3Auth.getAccessID());
- getClient().getObjectStore()
- .getClientProxy()
- .setThreadLocalS3Auth(s3Auth);
+ ClientProtocol clientProtocol =
+ getClient().getObjectStore().getClientProxy();
+ clientProtocol.setThreadLocalS3Auth(s3Auth);
+ clientProtocol.setIsS3Request(true);
init();
}
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 9503c53cfd..d85a628ea3 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -123,6 +124,7 @@ import static
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.PRECOND_FAILED;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
import static org.apache.hadoop.ozone.s3.util.S3Consts.ACCEPT_RANGE_HEADER;
+import static
org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.CONTENT_RANGE_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
import static
org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER_RANGE;
@@ -205,6 +207,7 @@ public class ObjectEndpoint extends EndpointBase {
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
for
* more details.
*/
+ @SuppressWarnings("checkstyle:MethodLength")
@PUT
public Response put(
@PathParam("bucket") String bucketName,
@@ -217,9 +220,6 @@ public class ObjectEndpoint extends EndpointBase {
S3GAction s3GAction = S3GAction.CREATE_KEY;
boolean auditSuccess = true;
-
- OzoneOutputStream output = null;
-
String copyHeader = null, storageType = null;
try {
OzoneVolume volume = getVolume();
@@ -289,6 +289,7 @@ public class ObjectEndpoint extends EndpointBase {
.equals(headers.getHeaderString("x-amz-content-sha256"))) {
body = new DigestInputStream(new SignedChunksInputStream(body),
E_TAG_PROVIDER.get());
+ length = Long.parseLong(amzDecodedLength);
} else {
body = new DigestInputStream(body, E_TAG_PROVIDER.get());
}
@@ -303,14 +304,16 @@ public class ObjectEndpoint extends EndpointBase {
eTag = keyWriteResult.getKey();
putLength = keyWriteResult.getValue();
} else {
- output = getClientProtocol().createKey(volume.getName(), bucketName,
- keyPath, length, replicationConfig, customMetadata);
- getMetrics().updatePutKeyMetadataStats(startNanos);
- putLength = IOUtils.copyLarge(body, output);
- eTag = DatatypeConverter.printHexBinary(
- ((DigestInputStream) body).getMessageDigest().digest())
- .toLowerCase();
- output.getMetadata().put(ETAG, eTag);
+ try (OzoneOutputStream output = getClientProtocol().createKey(
+ volume.getName(), bucketName, keyPath, length, replicationConfig,
+ customMetadata)) {
+ getMetrics().updatePutKeyMetadataStats(startNanos);
+ putLength = IOUtils.copyLarge(body, output);
+ eTag = DatatypeConverter.printHexBinary(
+ ((DigestInputStream) body).getMessageDigest().digest())
+ .toLowerCase();
+ output.getMetadata().put(ETAG, eTag);
+ }
}
getMetrics().incPutKeySuccessLength(putLength);
@@ -352,9 +355,6 @@ public class ObjectEndpoint extends EndpointBase {
}
throw ex;
} finally {
- if (output != null) {
- output.close();
- }
if (auditSuccess) {
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(s3GAction, getAuditParameters()));
@@ -845,6 +845,7 @@ public class ObjectEndpoint extends EndpointBase {
}
}
+ @SuppressWarnings("checkstyle:MethodLength")
private Response createMultipartKey(OzoneVolume volume, String bucket,
String key, long length, int partNumber,
String uploadID, InputStream body)
@@ -852,12 +853,13 @@ public class ObjectEndpoint extends EndpointBase {
long startNanos = Time.monotonicNowNanos();
String copyHeader = null;
try {
- OzoneOutputStream ozoneOutputStream = null;
if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
.equals(headers.getHeaderString("x-amz-content-sha256"))) {
body = new DigestInputStream(new SignedChunksInputStream(body),
E_TAG_PROVIDER.get());
+ length = Long.parseLong(
+ headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER));
} else {
body = new DigestInputStream(body, E_TAG_PROVIDER.get());
}
@@ -875,78 +877,96 @@ public class ObjectEndpoint extends EndpointBase {
enableEC = true;
}
- try {
- if (datastreamEnabled && !enableEC && copyHeader == null) {
- getMetrics().updatePutKeyMetadataStats(startNanos);
- return ObjectEndpointStreaming
- .createMultipartKey(ozoneBucket, key, length, partNumber,
- uploadID, chunkSize, (DigestInputStream) body);
+ if (datastreamEnabled && !enableEC && copyHeader == null) {
+ getMetrics().updatePutKeyMetadataStats(startNanos);
+ return ObjectEndpointStreaming
+ .createMultipartKey(ozoneBucket, key, length, partNumber,
+ uploadID, chunkSize, (DigestInputStream) body);
+ }
+ // OmMultipartCommitUploadPartInfo can only be gotten after the
+ // OzoneOutputStream is closed, so we need to save the KeyOutputStream
+ // in the OzoneOutputStream and use it to get the
+ // OmMultipartCommitUploadPartInfo after OzoneOutputStream is closed.
+ KeyOutputStream keyOutputStream = null;
+ if (copyHeader != null) {
+ Pair<String, String> result = parseSourceHeader(copyHeader);
+ String sourceBucket = result.getLeft();
+ String sourceKey = result.getRight();
+
+ OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails(
+ volume.getName(), sourceBucket, sourceKey);
+ String range =
+ headers.getHeaderString(COPY_SOURCE_HEADER_RANGE);
+ RangeHeader rangeHeader = null;
+ if (range != null) {
+ rangeHeader = RangeHeaderParserUtil.parseRangeHeader(range, 0);
+ // When copy Range, the size of the target key is the
+ // length specified by COPY_SOURCE_HEADER_RANGE.
+ length = rangeHeader.getEndOffset() -
+ rangeHeader.getStartOffset() + 1;
+ } else {
+ length = sourceKeyDetails.getDataSize();
+ }
+ Long sourceKeyModificationTime = sourceKeyDetails
+ .getModificationTime().toEpochMilli();
+ String copySourceIfModifiedSince =
+ headers.getHeaderString(COPY_SOURCE_IF_MODIFIED_SINCE);
+ String copySourceIfUnmodifiedSince =
+ headers.getHeaderString(COPY_SOURCE_IF_UNMODIFIED_SINCE);
+ if (!checkCopySourceModificationTime(sourceKeyModificationTime,
+ copySourceIfModifiedSince, copySourceIfUnmodifiedSince)) {
+ throw newError(PRECOND_FAILED, sourceBucket + "/" + sourceKey);
}
- ozoneOutputStream = getClientProtocol().createMultipartKey(
- volume.getName(), bucket, key, length, partNumber, uploadID);
-
- if (copyHeader != null) {
- Pair<String, String> result = parseSourceHeader(copyHeader);
-
- String sourceBucket = result.getLeft();
- String sourceKey = result.getRight();
-
- OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails(
- volume.getName(), sourceBucket, sourceKey);
- Long sourceKeyModificationTime = sourceKeyDetails
- .getModificationTime().toEpochMilli();
- String copySourceIfModifiedSince =
- headers.getHeaderString(COPY_SOURCE_IF_MODIFIED_SINCE);
- String copySourceIfUnmodifiedSince =
- headers.getHeaderString(COPY_SOURCE_IF_UNMODIFIED_SINCE);
- if (!checkCopySourceModificationTime(sourceKeyModificationTime,
- copySourceIfModifiedSince, copySourceIfUnmodifiedSince)) {
- throw newError(PRECOND_FAILED, sourceBucket + "/" + sourceKey);
- }
- try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) {
-
- String range =
- headers.getHeaderString(COPY_SOURCE_HEADER_RANGE);
- long copyLength;
- if (range != null) {
- RangeHeader rangeHeader =
- RangeHeaderParserUtil.parseRangeHeader(range, 0);
- final long skipped =
- sourceObject.skip(rangeHeader.getStartOffset());
- if (skipped != rangeHeader.getStartOffset()) {
- throw new EOFException(
- "Bytes to skip: "
- + rangeHeader.getStartOffset() + " actual: " +
skipped);
- }
+ try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) {
+ long copyLength;
+ if (range != null) {
+ final long skipped =
+ sourceObject.skip(rangeHeader.getStartOffset());
+ if (skipped != rangeHeader.getStartOffset()) {
+ throw new EOFException(
+ "Bytes to skip: "
+ + rangeHeader.getStartOffset() + " actual: " + skipped);
+ }
+ try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
+ .createMultipartKey(volume.getName(), bucket, key, length,
+ partNumber, uploadID)) {
getMetrics().updateCopyKeyMetadataStats(startNanos);
- copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream,
0,
- rangeHeader.getEndOffset() - rangeHeader.getStartOffset()
- + 1);
- } else {
+ copyLength = IOUtils.copyLarge(
+ sourceObject, ozoneOutputStream, 0, length);
+ keyOutputStream = ozoneOutputStream.getKeyOutputStream();
+ }
+ } else {
+ try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
+ .createMultipartKey(volume.getName(), bucket, key, length,
+ partNumber, uploadID)) {
getMetrics().updateCopyKeyMetadataStats(startNanos);
copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream);
+ keyOutputStream = ozoneOutputStream.getKeyOutputStream();
}
- getMetrics().incCopyObjectSuccessLength(copyLength);
}
- } else {
+ getMetrics().incCopyObjectSuccessLength(copyLength);
+ }
+ } else {
+ long putLength;
+ try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
+ .createMultipartKey(volume.getName(), bucket, key, length,
+ partNumber, uploadID)) {
getMetrics().updatePutKeyMetadataStats(startNanos);
- long putLength = IOUtils.copyLarge(body, ozoneOutputStream);
+ putLength = IOUtils.copyLarge(body, ozoneOutputStream);
((KeyMetadataAware)ozoneOutputStream.getOutputStream())
- .getMetadata().put("ETag", DatatypeConverter.printHexBinary(
- ((DigestInputStream) body).getMessageDigest().digest())
+ .getMetadata().put(ETAG, DatatypeConverter.printHexBinary(
+ ((DigestInputStream) body).getMessageDigest().digest())
.toLowerCase());
- getMetrics().incPutKeySuccessLength(putLength);
- }
- } finally {
- if (ozoneOutputStream != null) {
- ozoneOutputStream.close();
+ keyOutputStream
+ = ozoneOutputStream.getKeyOutputStream();
}
+ getMetrics().incPutKeySuccessLength(putLength);
}
- assert ozoneOutputStream != null;
+ assert keyOutputStream != null;
OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
- ozoneOutputStream.getCommitUploadPartInfo();
+ keyOutputStream.getCommitUploadPartInfo();
String eTag = omMultipartCommitUploadPartInfo.getPartName();
if (copyHeader != null) {
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index ef87ad450d..b536b3248b 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -22,6 +22,7 @@ import javax.xml.bind.DatatypeConverter;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -145,18 +146,24 @@ final class ObjectEndpointStreaming {
String uploadID, int chunkSize,
DigestInputStream body)
throws IOException, OS3Exception {
- OzoneDataStreamOutput streamOutput = null;
String eTag;
S3GatewayMetrics metrics = S3GatewayMetrics.create();
+ // OmMultipartCommitUploadPartInfo can only be gotten after the
+ // OzoneDataStreamOutput is closed, so we need to save the
+ // KeyDataStreamOutput in the OzoneDataStreamOutput and use it to get the
+ // OmMultipartCommitUploadPartInfo after OzoneDataStreamOutput is closed.
+ KeyDataStreamOutput keyDataStreamOutput = null;
try {
- streamOutput = ozoneBucket
- .createMultipartStreamKey(key, length, partNumber, uploadID);
- long putLength =
- writeToStreamOutput(streamOutput, body, chunkSize, length);
- eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest())
- .toLowerCase();
- ((KeyMetadataAware)streamOutput).getMetadata().put("ETag", eTag);
- metrics.incPutKeySuccessLength(putLength);
+ try (OzoneDataStreamOutput streamOutput = ozoneBucket
+ .createMultipartStreamKey(key, length, partNumber, uploadID)) {
+ long putLength =
+ writeToStreamOutput(streamOutput, body, chunkSize, length);
+ eTag = DatatypeConverter.printHexBinary(
+ body.getMessageDigest().digest()).toLowerCase();
+ ((KeyMetadataAware)streamOutput).getMetadata().put("ETag", eTag);
+ metrics.incPutKeySuccessLength(putLength);
+ keyDataStreamOutput = streamOutput.getKeyDataStreamOutput();
+ }
} catch (OMException ex) {
if (ex.getResult() ==
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
@@ -168,10 +175,9 @@ final class ObjectEndpointStreaming {
}
throw ex;
} finally {
- if (streamOutput != null) {
- streamOutput.close();
+ if (keyDataStreamOutput != null) {
OmMultipartCommitUploadPartInfo commitUploadPartInfo =
- streamOutput.getCommitUploadPartInfo();
+ keyDataStreamOutput.getCommitUploadPartInfo();
eTag = commitUploadPartInfo.getPartName();
}
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index 5505688b26..71f4784350 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -577,6 +577,11 @@ public class ClientProtocolStub implements ClientProtocol {
}
+ @Override
+ public void setIsS3Request(boolean isS3Request) {
+
+ }
+
@Override
public S3Auth getThreadLocalS3Auth() {
return null;
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index d546afe2c7..fad3386c61 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -512,7 +512,7 @@ public class OzoneBucketStub extends OzoneBucket {
if (partEntry.getKey() > partNumberMarker) {
PartInfo partInfo = new PartInfo(partEntry.getKey(),
partEntry.getValue().getPartName(),
- partEntry.getValue().getContent().length, Time.now());
+ Time.now(), partEntry.getValue().getContent().length);
partInfoList.add(partInfo);
count++;
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
index 326d388b88..00a7ba5574 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
@@ -20,6 +20,9 @@
package org.apache.hadoop.ozone.client;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
@@ -69,6 +72,18 @@ public class OzoneOutputStreamStub extends OzoneOutputStream
{
}
}
+ @Override
+ public KeyOutputStream getKeyOutputStream() {
+ return new KeyOutputStream(
+ ReplicationConfig.getDefault(new OzoneConfiguration()), null) {
+ @Override
+ public synchronized OmMultipartCommitUploadPartInfo
+ getCommitUploadPartInfo() {
+ return OzoneOutputStreamStub.this.getCommitUploadPartInfo();
+ }
+ };
+ }
+
@Override
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
return closed ? new OmMultipartCommitUploadPartInfo(partName) : null;
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
index 7422806db7..7186ceb557 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.s3.endpoint.CompleteMultipartUploadRequest.Part;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
@@ -385,6 +386,28 @@ public class TestMultipartUploadWithCopy {
return part;
}
+ @Test
+ public void testUploadWithRangeCopyContentLength()
+ throws IOException, OS3Exception {
+ // The contentLength specified when creating the Key should be the same as
+ // the Content-Length, the key Commit will compare the Content-Length with
+ // the actual length of the data written.
+
+ String uploadID = initiateMultipartUpload(KEY);
+ ByteArrayInputStream body = new ByteArrayInputStream("".getBytes(UTF_8));
+ Map<String, String> additionalHeaders = new HashMap<>();
+ additionalHeaders.put(COPY_SOURCE_HEADER,
+ OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY);
+ additionalHeaders.put(COPY_SOURCE_HEADER_RANGE, "bytes=0-3");
+ setHeaders(additionalHeaders);
+ REST.put(OzoneConsts.S3_BUCKET, KEY, 0, 1, uploadID, body);
+ OzoneMultipartUploadPartListParts parts =
+ CLIENT.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET)
+ .listParts(KEY, uploadID, 0, 100);
+ Assert.assertEquals(1, parts.getPartInfoList().size());
+ Assert.assertEquals(4, parts.getPartInfoList().get(0).getSize());
+ }
+
private void completeMultipartUpload(String key,
CompleteMultipartUploadRequest completeMultipartUploadRequest,
String uploadID) throws IOException, OS3Exception {
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
index 55c661084b..4c1fd9a7aa 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Utils.urlEncode;
@@ -140,6 +141,47 @@ public class TestObjectPut {
Assert.assertEquals(CONTENT, keyContent);
}
+ @Test
+ public void testPutObjectContentLength() throws IOException, OS3Exception {
+ // The contentLength specified when creating the Key should be the same as
+ // the Content-Length, the key Commit will compare the Content-Length with
+ // the actual length of the data written.
+ HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+ ByteArrayInputStream body =
+ new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+ objectEndpoint.setHeaders(headers);
+ long dataSize = CONTENT.length();
+
+ objectEndpoint.put(bucketName, keyName, dataSize, 0, null, body);
+ Assert.assertEquals(dataSize, getKeyDataSize(keyName));
+ }
+
+ @Test
+ public void testPutObjectContentLengthForStreaming()
+ throws IOException, OS3Exception {
+ HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+ objectEndpoint.setHeaders(headers);
+
+ String chunkedContent = "0a;chunk-signature=signature\r\n"
+ + "1234567890\r\n"
+ + "05;chunk-signature=signature\r\n"
+ + "abcde\r\n";
+
+ when(headers.getHeaderString("x-amz-content-sha256"))
+ .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD");
+
+ when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER))
+ .thenReturn("15");
+ objectEndpoint.put(bucketName, keyName, chunkedContent.length(), 0, null,
+ new ByteArrayInputStream(chunkedContent.getBytes(UTF_8)));
+ Assert.assertEquals(15, getKeyDataSize(keyName));
+ }
+
+ private long getKeyDataSize(String key) throws IOException {
+ return clientStub.getObjectStore().getS3Bucket(bucketName)
+ .getKey(key).getDataSize();
+ }
+
@Test
public void testPutObjectWithSignedChunks() throws IOException, OS3Exception
{
//GIVEN
@@ -153,6 +195,8 @@ public class TestObjectPut {
when(headers.getHeaderString("x-amz-content-sha256"))
.thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD");
+ when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER))
+ .thenReturn("15");
//WHEN
Response response = objectEndpoint.put(bucketName, keyName,
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
index 1b0e808f2d..6ba1b557ec 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
@@ -24,7 +24,9 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -33,9 +35,12 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.UUID;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -49,11 +54,12 @@ import static org.mockito.Mockito.when;
public class TestPartUpload {
private static final ObjectEndpoint REST = new ObjectEndpoint();
+ private static OzoneClient client;
@BeforeClass
public static void setUp() throws Exception {
- OzoneClient client = new OzoneClientStub();
+ client = new OzoneClientStub();
client.getObjectStore().createS3Bucket(OzoneConsts.S3_BUCKET);
@@ -135,4 +141,69 @@ public class TestPartUpload {
assertEquals(HTTP_NOT_FOUND, ex.getHttpCode());
}
}
+
+ @Test
+ public void testPartUploadStreamContentLength()
+ throws IOException, OS3Exception {
+ HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+ ObjectEndpoint objectEndpoint = new ObjectEndpoint();
+ objectEndpoint.setHeaders(headers);
+ objectEndpoint.setClient(client);
+ objectEndpoint.setOzoneConfiguration(new OzoneConfiguration());
+ String keyName = UUID.randomUUID().toString();
+
+ String chunkedContent = "0a;chunk-signature=signature\r\n"
+ + "1234567890\r\n"
+ + "05;chunk-signature=signature\r\n"
+ + "abcde\r\n";
+ when(headers.getHeaderString("x-amz-content-sha256"))
+ .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD");
+ when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER))
+ .thenReturn("15");
+
+ Response response = objectEndpoint.initializeMultipartUpload(
+ OzoneConsts.S3_BUCKET, keyName);
+ MultipartUploadInitiateResponse multipartUploadInitiateResponse =
+ (MultipartUploadInitiateResponse) response.getEntity();
+ assertNotNull(multipartUploadInitiateResponse.getUploadID());
+ String uploadID = multipartUploadInitiateResponse.getUploadID();
+ long contentLength = chunkedContent.length();
+
+ objectEndpoint.put(OzoneConsts.S3_BUCKET, keyName, contentLength, 1,
+ uploadID, new ByteArrayInputStream(chunkedContent.getBytes(UTF_8)));
+ assertContentLength(uploadID, keyName, 15);
+ }
+
+ @Test
+ public void testPartUploadContentLength() throws IOException, OS3Exception {
+ // The contentLength specified when creating the Key should be the same as
+ // the Content-Length, the key Commit will compare the Content-Length with
+ // the actual length of the data written.
+
+ String keyName = UUID.randomUUID().toString();
+ Response response = REST.initializeMultipartUpload(OzoneConsts.S3_BUCKET,
+ keyName);
+ MultipartUploadInitiateResponse multipartUploadInitiateResponse =
+ (MultipartUploadInitiateResponse) response.getEntity();
+ assertNotNull(multipartUploadInitiateResponse.getUploadID());
+ String uploadID = multipartUploadInitiateResponse.getUploadID();
+ String content = "Multipart Upload";
+ long contentLength = content.length();
+
+ ByteArrayInputStream body =
+ new ByteArrayInputStream(content.getBytes(UTF_8));
+ REST.put(OzoneConsts.S3_BUCKET, keyName,
+ contentLength, 1, uploadID, body);
+ assertContentLength(uploadID, keyName, content.length());
+ }
+
+ private void assertContentLength(String uploadID, String key,
+ long contentLength) throws IOException {
+ OzoneMultipartUploadPartListParts parts =
+ client.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET)
+ .listParts(key, uploadID, 0, 100);
+ Assert.assertEquals(1, parts.getPartInfoList().size());
+ Assert.assertEquals(contentLength,
+ parts.getPartInfoList().get(0).getSize());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]