This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1d98a21 HDDS-1819. Implement S3 Commit MPU request to use Cache and
DoubleBuffer. (#1140)
1d98a21 is described below
commit 1d98a212cb729b458468af44287dff257fed97e3
Author: Bharat Viswanadham <[email protected]>
AuthorDate: Wed Jul 24 14:03:40 2019 -0700
HDDS-1819. Implement S3 Commit MPU request to use Cache and DoubleBuffer.
(#1140)
---
.../om/ratis/utils/OzoneManagerRatisUtils.java | 3 +
.../S3MultipartUploadCommitPartRequest.java | 217 +++++++++++++++++++++
.../S3MultipartUploadCommitPartResponse.java | 109 +++++++++++
.../OzoneManagerHARequestHandlerImpl.java | 15 +-
.../ozone/om/request/TestOMRequestUtils.java | 32 +++
.../TestS3InitiateMultipartUploadRequest.java | 37 +---
.../s3/multipart/TestS3MultipartRequest.java | 68 +++++++
.../TestS3MultipartUploadCommitPartRequest.java | 209 ++++++++++++++++++++
8 files changed, 653 insertions(+), 37 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index c86f27a..84a6530 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -34,6 +34,7 @@ import
org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest;
import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketDeleteRequest;
import
org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest;
+import
org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest;
import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest;
@@ -108,6 +109,8 @@ public final class OzoneManagerRatisUtils {
return new S3BucketDeleteRequest(omRequest);
case InitiateMultiPartUpload:
return new S3InitiateMultipartUploadRequest(omRequest);
+ case CommitMultiPartUpload:
+ return new S3MultipartUploadCommitPartRequest(omRequest);
default:
// TODO: will update once all request types are implemented.
return null;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
new file mode 100644
index 0000000..fc3daec
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.multipart
+ .S3MultipartUploadCommitPartResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .MultipartCommitUploadPartRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .MultipartCommitUploadPartResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+
+import java.io.IOException;
+import java.util.stream.Collectors;
+
+import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handle Multipart upload commit upload part file.
+ */
+public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
+
+ public S3MultipartUploadCommitPartRequest(OMRequest omRequest) {
+ super(omRequest);
+ }
+
+ @Override
+ public OMRequest preExecute(OzoneManager ozoneManager) {
+ MultipartCommitUploadPartRequest multipartCommitUploadPartRequest =
+ getOmRequest().getCommitMultiPartUploadRequest();
+
+ return getOmRequest().toBuilder().setCommitMultiPartUploadRequest(
+ multipartCommitUploadPartRequest.toBuilder()
+ .setKeyArgs(multipartCommitUploadPartRequest.getKeyArgs()
+ .toBuilder().setModificationTime(Time.now())))
+ .setUserInfo(getUserInfo()).build();
+ }
+
+ @Override
+ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+ long transactionLogIndex) {
+ MultipartCommitUploadPartRequest multipartCommitUploadPartRequest =
+ getOmRequest().getCommitMultiPartUploadRequest();
+
+ OzoneManagerProtocolProtos.KeyArgs keyArgs =
+ multipartCommitUploadPartRequest.getKeyArgs();
+
+ String volumeName = keyArgs.getVolumeName();
+ String bucketName = keyArgs.getBucketName();
+ String keyName = keyArgs.getKeyName();
+
+ OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+ ozoneManager.getMetrics().incNumCommitMultipartUploadParts();
+
+ boolean acquiredLock = false;
+ OmMultipartKeyInfo multipartKeyInfo = null;
+ OmKeyInfo omKeyInfo = null;
+ String openKey = null;
+ String multipartKey = null;
+ OzoneManagerProtocolProtos.PartKeyInfo oldPartKeyInfo = null;
+ IOException exception = null;
+ String partName = null;
+ try {
+ // check Acl
+ if (ozoneManager.getAclsEnabled()) {
+ checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
+ OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
+ volumeName, bucketName, keyName);
+ }
+
+ acquiredLock =
+ omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+ bucketName);
+
+ validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+ String uploadID = keyArgs.getMultipartUploadID();
+ multipartKey = omMetadataManager.getMultipartKey(volumeName, bucketName,
+ keyName, uploadID);
+
+ multipartKeyInfo = omMetadataManager
+ .getMultipartInfoTable().get(multipartKey);
+
+ long clientID = multipartCommitUploadPartRequest.getClientID();
+
+ openKey = omMetadataManager.getOpenKey(
+ volumeName, bucketName, keyName, clientID);
+
+ omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
+
+
+ if (omKeyInfo == null) {
+ throw new OMException("Failed to commit Multipart Upload key, as " +
+ openKey + "entry is not found in the openKey table",
KEY_NOT_FOUND);
+ }
+
+ // set the data size and location info list
+ omKeyInfo.setDataSize(keyArgs.getDataSize());
+ omKeyInfo.updateLocationInfoList(keyArgs.getKeyLocationsList().stream()
+ .map(OmKeyLocationInfo::getFromProtobuf)
+ .collect(Collectors.toList()));
+ // Set Modification time
+ omKeyInfo.setModificationTime(keyArgs.getModificationTime());
+
+ partName = omMetadataManager.getOzoneKey(volumeName, bucketName,
+ keyName) + clientID;
+
+ if (multipartKeyInfo == null) {
+ // This can occur when user started uploading part by the time commit
+ // of that part happens, in between the user might have requested
+ // abort multipart upload. If we just throw exception, then the data
+ // will not be garbage collected, so move this part to delete table
+ // and throw error
+ // Move this part to delete table.
+ throw new OMException("No such Multipart upload is with specified " +
+ "uploadId " + uploadID,
+ OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+ } else {
+ int partNumber = keyArgs.getMultipartNumber();
+ oldPartKeyInfo = multipartKeyInfo.getPartKeyInfo(partNumber);
+
+ // Build this multipart upload part info.
+ OzoneManagerProtocolProtos.PartKeyInfo.Builder partKeyInfo =
+ OzoneManagerProtocolProtos.PartKeyInfo.newBuilder();
+ partKeyInfo.setPartName(partName);
+ partKeyInfo.setPartNumber(partNumber);
+ partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf());
+
+ // Add this part information in to multipartKeyInfo.
+ multipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo.build());
+
+ // Add to cache.
+
+ // Delete from open key table and add it to multipart info table.
+ // No need to add cache entries to delete table, as no
+ // read/write requests that info for validation.
+ omMetadataManager.getMultipartInfoTable().addCacheEntry(
+ new CacheKey<>(multipartKey),
+ new CacheValue<>(Optional.of(multipartKeyInfo),
+ transactionLogIndex));
+
+ omMetadataManager.getOpenKeyTable().addCacheEntry(
+ new CacheKey<>(openKey),
+ new CacheValue<>(Optional.absent(), transactionLogIndex));
+ }
+
+ } catch (IOException ex) {
+ exception = ex;
+ } finally {
+ if (acquiredLock) {
+ omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+ bucketName);
+ }
+ }
+
+ // audit log
+ auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
+ OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY,
buildKeyArgsAuditMap(keyArgs),
+ exception, getOmRequest().getUserInfo()));
+
+ OMResponse.Builder omResponse = OMResponse.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.CommitMultiPartUpload)
+ .setStatus(OzoneManagerProtocolProtos.Status.OK)
+ .setSuccess(true);
+
+ if (exception == null) {
+ omResponse.setCommitMultiPartUploadResponse(
+
MultipartCommitUploadPartResponse.newBuilder().setPartName(partName));
+ return new S3MultipartUploadCommitPartResponse(multipartKey, openKey,
+ keyArgs.getModificationTime(), omKeyInfo, multipartKeyInfo,
+ oldPartKeyInfo, omResponse.build());
+ } else {
+ ozoneManager.getMetrics().incNumCommitMultipartUploadPartFails();
+ return new S3MultipartUploadCommitPartResponse(multipartKey, openKey,
+ keyArgs.getModificationTime(), omKeyInfo, multipartKeyInfo,
+ oldPartKeyInfo, createErrorOMResponse(omResponse, exception));
+
+ }
+ }
+}
+
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java
new file mode 100644
index 0000000..2d76a40
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMResponse;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+import java.io.IOException;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .Status.NO_SUCH_MULTIPART_UPLOAD_ERROR;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .Status.OK;
+
+/**
+ * Response for S3MultipartUploadCommitPart request.
+ */
+public class S3MultipartUploadCommitPartResponse extends OMClientResponse {
+
+ private String multipartKey;
+ private String openKey;
+ private long deleteTimeStamp;
+ private OmKeyInfo deletePartKeyInfo;
+ private OmMultipartKeyInfo omMultipartKeyInfo;
+ private OzoneManagerProtocolProtos.PartKeyInfo oldMultipartKeyInfo;
+
+
+ public S3MultipartUploadCommitPartResponse(String multipartKey,
+ String openKey, long deleteTimeStamp,
+ OmKeyInfo deletePartKeyInfo, OmMultipartKeyInfo omMultipartKeyInfo,
+ OzoneManagerProtocolProtos.PartKeyInfo oldPartKeyInfo,
+ OMResponse omResponse) {
+ super(omResponse);
+ this.multipartKey = multipartKey;
+ this.openKey = openKey;
+ this.deleteTimeStamp = deleteTimeStamp;
+ this.deletePartKeyInfo = deletePartKeyInfo;
+ this.omMultipartKeyInfo = omMultipartKeyInfo;
+ this.oldMultipartKeyInfo = oldPartKeyInfo;
+ }
+
+
+ @Override
+ public void addToDBBatch(OMMetadataManager omMetadataManager,
+ BatchOperation batchOperation) throws IOException {
+
+
+ if (getOMResponse().getStatus() == NO_SUCH_MULTIPART_UPLOAD_ERROR) {
+ // Means by the time we try to commit part, some one has aborted this
+ // multipart upload. So, delete this part information.
+ omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+ OmUtils.getDeletedKeyName(openKey, deleteTimeStamp),
+ deletePartKeyInfo);
+ }
+
+ if (getOMResponse().getStatus() == OK) {
+
+ // If we have old part info:
+ // Need to do 3 steps:
+ // 1. add old part to delete table
+ // 2. Commit multipart info which has information about this new part.
+ // 3. delete this new part entry from open key table.
+
+ // This means for this multipart upload part upload, we have an old
+ // part information, so delete it.
+ if (oldMultipartKeyInfo != null) {
+ omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+ OmUtils.getDeletedKeyName(oldMultipartKeyInfo.getPartName(),
+ deleteTimeStamp),
+ OmKeyInfo.getFromProtobuf(oldMultipartKeyInfo.getPartKeyInfo()));
+ }
+
+
+ omMetadataManager.getMultipartInfoTable().putWithBatch(batchOperation,
+ multipartKey, omMultipartKeyInfo);
+
+ // This information has been added to multipartKeyInfo. So, we can
+ // safely delete part key info from open key table.
+ omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation,
+ openKey);
+
+
+ }
+ }
+
+}
+
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java
index 44f9b90..2043942 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java
@@ -27,8 +27,6 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .Status;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.Type;
/**
@@ -70,6 +68,7 @@ public class OzoneManagerHARequestHandlerImpl
case CreateS3Bucket:
case DeleteS3Bucket:
case InitiateMultiPartUpload:
+ case CommitMultiPartUpload:
//TODO: We don't need to pass transactionID, this will be removed when
// complete write requests is changed to new model. And also we can
// return OMClientResponse, then adding to doubleBuffer can be taken
@@ -81,12 +80,12 @@ public class OzoneManagerHARequestHandlerImpl
omClientRequest.validateAndUpdateCache(getOzoneManager(),
transactionLogIndex);
- // If any error we have got when validateAndUpdateCache, OMResponse
- // Status is set with Error Code other than OK, in that case don't
- // add this to double buffer.
- if (omClientResponse.getOMResponse().getStatus() == Status.OK) {
- ozoneManagerDoubleBuffer.add(omClientResponse, transactionLogIndex);
- }
+
+ // Add OMClient Response to double buffer.
+ // Each OMClient Response should handle what needs to be done in error
+ // case.
+ ozoneManagerDoubleBuffer.add(omClientResponse, transactionLogIndex);
+
return omClientResponse.getOMResponse();
default:
// As all request types are not changed so we need to call handle
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
index 96f3fae..d186ad6 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .MultipartCommitUploadPartRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateRequest;
@@ -316,4 +318,34 @@ public final class TestOMRequestUtils {
.build();
}
+ /**
+ * Create OMRequest which encapsulates InitiateMultipartUpload request.
+ * @param volumeName
+ * @param bucketName
+ * @param keyName
+ */
+ public static OMRequest createCommitPartMPURequest(String volumeName,
+ String bucketName, String keyName, long clientID, long size,
+ String multipartUploadID, int partNumber) {
+
+ // Just set dummy size.
+ KeyArgs.Builder keyArgs =
+ KeyArgs.newBuilder().setVolumeName(volumeName).setKeyName(keyName)
+ .setBucketName(bucketName)
+ .setDataSize(size)
+ .setMultipartNumber(partNumber)
+ .setMultipartUploadID(multipartUploadID)
+ .addAllKeyLocations(new ArrayList<>());
+ // Just adding dummy list. As this is for UT only.
+
+ MultipartCommitUploadPartRequest multipartCommitUploadPartRequest =
+ MultipartCommitUploadPartRequest.newBuilder()
+ .setKeyArgs(keyArgs).setClientID(clientID).build();
+
+ return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+ .setCmdType(OzoneManagerProtocolProtos.Type.CommitMultiPartUpload)
+ .setCommitMultiPartUploadRequest(multipartCommitUploadPartRequest)
+ .build();
+ }
+
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
index 99d8de8..619293e 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java
@@ -37,8 +37,8 @@ public class TestS3InitiateMultipartUploadRequest
@Test
public void testPreExecute() {
- doPreExecute(UUID.randomUUID().toString(), UUID.randomUUID().toString(),
- UUID.randomUUID().toString());
+ doPreExecuteInitiateMPU(UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(), UUID.randomUUID().toString());
}
@@ -52,7 +52,8 @@ public class TestS3InitiateMultipartUploadRequest
TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager);
- OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName);
+ OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName,
+ bucketName, keyName);
S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
new S3InitiateMultipartUploadRequest(modifiedRequest);
@@ -97,7 +98,8 @@ public class TestS3InitiateMultipartUploadRequest
TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
- OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName);
+ OMRequest modifiedRequest = doPreExecuteInitiateMPU(
+ volumeName, bucketName, keyName);
S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
new S3InitiateMultipartUploadRequest(modifiedRequest);
@@ -126,7 +128,8 @@ public class TestS3InitiateMultipartUploadRequest
String keyName = UUID.randomUUID().toString();
- OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName);
+ OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName, bucketName,
+ keyName);
S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
new S3InitiateMultipartUploadRequest(modifiedRequest);
@@ -147,28 +150,4 @@ public class TestS3InitiateMultipartUploadRequest
.get(multipartKey));
}
-
-
-
- private OMRequest doPreExecute(String volumeName, String bucketName,
- String keyName) {
- OMRequest omRequest =
- TestOMRequestUtils.createInitiateMPURequest(volumeName, bucketName,
- keyName);
-
- S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
- new S3InitiateMultipartUploadRequest(omRequest);
-
- OMRequest modifiedRequest =
- s3InitiateMultipartUploadRequest.preExecute(ozoneManager);
-
- Assert.assertNotEquals(omRequest, modifiedRequest);
- Assert.assertTrue(modifiedRequest.hasInitiateMultiPartUploadRequest());
- Assert.assertNotNull(modifiedRequest.getInitiateMultiPartUploadRequest()
- .getKeyArgs().getMultipartUploadID());
- Assert.assertTrue(modifiedRequest.getInitiateMultiPartUploadRequest()
- .getKeyArgs().getModificationTime() > 0);
-
- return modifiedRequest;
- }
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index 15b642b..d53235a 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -19,7 +19,11 @@
package org.apache.hadoop.ozone.om.request.s3.multipart;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMRequest;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
@@ -73,4 +77,68 @@ public class TestS3MultipartRequest {
Mockito.framework().clearInlineMocks();
}
+ /**
+ * Perform preExecute of Initiate Multipart upload request for given
+ * volume, bucket and key name.
+ * @param volumeName
+ * @param bucketName
+ * @param keyName
+ * @return OMRequest - returned from preExecute.
+ */
+ protected OMRequest doPreExecuteInitiateMPU(
+ String volumeName, String bucketName, String keyName) {
+ OMRequest omRequest =
+ TestOMRequestUtils.createInitiateMPURequest(volumeName, bucketName,
+ keyName);
+
+ S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+ new S3InitiateMultipartUploadRequest(omRequest);
+
+ OMRequest modifiedRequest =
+ s3InitiateMultipartUploadRequest.preExecute(ozoneManager);
+
+ Assert.assertNotEquals(omRequest, modifiedRequest);
+ Assert.assertTrue(modifiedRequest.hasInitiateMultiPartUploadRequest());
+ Assert.assertNotNull(modifiedRequest.getInitiateMultiPartUploadRequest()
+ .getKeyArgs().getMultipartUploadID());
+ Assert.assertTrue(modifiedRequest.getInitiateMultiPartUploadRequest()
+ .getKeyArgs().getModificationTime() > 0);
+
+ return modifiedRequest;
+ }
+
+ /**
+ * Perform preExecute of Commit Multipart Upload request for given volume,
+ * bucket and keyName.
+ * @param volumeName
+ * @param bucketName
+ * @param keyName
+ * @param clientID
+ * @param multipartUploadID
+ * @param partNumber
+ * @return OMRequest - returned from preExecute.
+ */
+ protected OMRequest doPreExecuteCommitMPU(
+ String volumeName, String bucketName, String keyName,
+ long clientID, String multipartUploadID, int partNumber) {
+
+ // Just set dummy size
+ long dataSize = 100L;
+ OMRequest omRequest =
+ TestOMRequestUtils.createCommitPartMPURequest(volumeName, bucketName,
+ keyName, clientID, dataSize, multipartUploadID, partNumber);
+ S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+ new S3MultipartUploadCommitPartRequest(omRequest);
+
+
+ OMRequest modifiedRequest =
+ s3MultipartUploadCommitPartRequest.preExecute(ozoneManager);
+
+ // UserInfo and modification time is set.
+ Assert.assertNotEquals(omRequest, modifiedRequest);
+
+ return modifiedRequest;
+ }
+
+
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java
new file mode 100644
index 0000000..19d985d
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMRequest;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+/**
+ * Tests S3 Multipart upload commit part request.
+ */
+public class TestS3MultipartUploadCommitPartRequest
+ extends TestS3MultipartRequest {
+
+ @Test
+ public void testPreExecute() {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ doPreExecuteCommitMPU(volumeName, bucketName, keyName, Time.now(),
+ UUID.randomUUID().toString(), 1);
+ }
+
+
+ @Test
+ public void testValidateAndUpdateCacheSuccess() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager);
+
+ OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName,
+ bucketName, keyName);
+
+ S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+ new S3InitiateMultipartUploadRequest(initiateMPURequest);
+
+ OMClientResponse omClientResponse =
+ s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
+ 1L);
+
+ long clientID = Time.now();
+ String multipartUploadID = omClientResponse.getOMResponse()
+ .getInitiateMultiPartUploadResponse().getMultipartUploadID();
+
+ OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
+ bucketName, keyName, clientID, multipartUploadID, 1);
+
+ S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+ new S3MultipartUploadCommitPartRequest(commitMultipartRequest);
+
+ // Add key to open key table.
+ TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName,
+ keyName, clientID, HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+
+ omClientResponse =
+ s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
+ 2L);
+
+
+ Assert.assertTrue(omClientResponse.getOMResponse().getStatus()
+ == OzoneManagerProtocolProtos.Status.OK);
+
+ String multipartKey = omMetadataManager.getMultipartKey(volumeName,
+ bucketName, keyName, multipartUploadID);
+
+ Assert.assertNotNull(
+ omMetadataManager.getMultipartInfoTable().get(multipartKey));
+ Assert.assertTrue(omMetadataManager.getMultipartInfoTable()
+ .get(multipartKey).getPartKeyInfoMap().size() == 1);
+ Assert.assertNull(omMetadataManager.getOpenKeyTable()
+ .get(omMetadataManager.getOpenKey(volumeName, bucketName, keyName,
+ clientID)));
+
+ }
+
+ @Test
+ public void testValidateAndUpdateCacheMultipartNotFound() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager);
+
+
+ long clientID = Time.now();
+ String multipartUploadID = UUID.randomUUID().toString();
+
+ OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
+ bucketName, keyName, clientID, multipartUploadID, 1);
+
+ S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+ new S3MultipartUploadCommitPartRequest(commitMultipartRequest);
+
+ // Add key to open key table.
+ TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName,
+ keyName, clientID, HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+
+ OMClientResponse omClientResponse =
+ s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
+ 2L);
+
+
+ Assert.assertTrue(omClientResponse.getOMResponse().getStatus()
+ == OzoneManagerProtocolProtos.Status.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+
+ String multipartKey = omMetadataManager.getMultipartKey(volumeName,
+ bucketName, keyName, multipartUploadID);
+
+ Assert.assertNull(
+ omMetadataManager.getMultipartInfoTable().get(multipartKey));
+
+ }
+
+ @Test
+ public void testValidateAndUpdateCacheKeyNotFound() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager);
+
+
+ long clientID = Time.now();
+ String multipartUploadID = UUID.randomUUID().toString();
+
+ OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
+ bucketName, keyName, clientID, multipartUploadID, 1);
+
+ // Don't add key to open table entry, and we are trying to commit this MPU
+ // part. It will fail with KEY_NOT_FOUND
+
+ S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+ new S3MultipartUploadCommitPartRequest(commitMultipartRequest);
+
+
+ OMClientResponse omClientResponse =
+ s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
+ 2L);
+
+ Assert.assertTrue(omClientResponse.getOMResponse().getStatus()
+ == OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND);
+
+ }
+
+
+ @Test
+ public void testValidateAndUpdateCacheBucketFound() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
+
+
+ long clientID = Time.now();
+ String multipartUploadID = UUID.randomUUID().toString();
+
+ OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
+ bucketName, keyName, clientID, multipartUploadID, 1);
+
+ // Don't add key to open table entry, and we are trying to commit this MPU
+ // part. It will fail with KEY_NOT_FOUND
+
+ S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+ new S3MultipartUploadCommitPartRequest(commitMultipartRequest);
+
+
+ OMClientResponse omClientResponse =
+ s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
+ 2L);
+
+ Assert.assertTrue(omClientResponse.getOMResponse().getStatus()
+ == OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND);
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]