This is an automated email from the ASF dual-hosted git repository. rakeshr pushed a commit to branch HDDS-2939 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 52cc11261df92c7fb7c1659b6046c8d4e5fbe176 Author: Rakesh Radhakrishnan <[email protected]> AuthorDate: Wed Feb 17 20:50:05 2021 +0530 HDDS-4813. [FSO]S3Multipart: Implement UploadCompleteRequest (#1923) --- .../rpc/TestOzoneClientMultipartUploadV1.java | 274 ++++++++++++++-- .../om/ratis/utils/OzoneManagerRatisUtils.java | 4 + .../ozone/om/request/file/OMFileRequest.java | 4 +- .../S3MultipartUploadCommitPartRequestV1.java | 39 ++- .../S3MultipartUploadCompleteRequest.java | 349 +++++++++++++-------- .../S3MultipartUploadCompleteRequestV1.java | 268 ++++++++++++++++ .../S3MultipartUploadCompleteResponse.java | 13 +- ...va => S3MultipartUploadCompleteResponseV1.java} | 60 ++-- .../s3/multipart/TestS3MultipartRequest.java | 9 +- .../TestS3MultipartUploadCompleteRequest.java | 118 ++++++- .../TestS3MultipartUploadCompleteRequestV1.java | 132 ++++++++ .../s3/multipart/TestS3MultipartResponse.java | 21 ++ .../TestS3MultipartUploadCompleteResponseV1.java | 257 +++++++++++++++ 13 files changed, 1330 insertions(+), 218 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java index af241c5..1ab2cc3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadV1.java @@ -17,24 +17,29 @@ package org.apache.hadoop.ozone.client.rpc; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneTestUtils; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.StringUtils.string2Bytes; import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; +import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; import org.junit.AfterClass; import org.junit.Assert; @@ -44,10 +49,15 @@ import org.junit.Test; import org.junit.rules.Timeout; import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR; /** * This test verifies all the S3 multipart client apis - layout version V1. @@ -133,24 +143,24 @@ public class TestOzoneClientMultipartUploadV1 { OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, STAND_ALONE, ONE); - assertNotNull(multipartInfo); + Assert.assertNotNull(multipartInfo); String uploadID = multipartInfo.getUploadID(); Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); Assert.assertEquals(bucketName, multipartInfo.getBucketName()); Assert.assertEquals(keyName, multipartInfo.getKeyName()); - assertNotNull(multipartInfo.getUploadID()); + Assert.assertNotNull(multipartInfo.getUploadID()); // Call initiate multipart upload for the same key again, this should // generate a new uploadID. multipartInfo = bucket.initiateMultipartUpload(keyName, STAND_ALONE, ONE); - assertNotNull(multipartInfo); + Assert.assertNotNull(multipartInfo); Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); Assert.assertEquals(bucketName, multipartInfo.getBucketName()); Assert.assertEquals(keyName, multipartInfo.getKeyName()); - assertNotEquals(multipartInfo.getUploadID(), uploadID); - assertNotNull(multipartInfo.getUploadID()); + Assert.assertNotEquals(multipartInfo.getUploadID(), uploadID); + Assert.assertNotNull(multipartInfo.getUploadID()); } @Test @@ -166,23 +176,23 @@ public class TestOzoneClientMultipartUploadV1 { OzoneBucket bucket = volume.getBucket(bucketName); OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName); - assertNotNull(multipartInfo); + Assert.assertNotNull(multipartInfo); String uploadID = multipartInfo.getUploadID(); Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); Assert.assertEquals(bucketName, multipartInfo.getBucketName()); Assert.assertEquals(keyName, multipartInfo.getKeyName()); - assertNotNull(multipartInfo.getUploadID()); + Assert.assertNotNull(multipartInfo.getUploadID()); // Call initiate multipart upload for the same key again, this should // generate a new uploadID. multipartInfo = bucket.initiateMultipartUpload(keyName); - assertNotNull(multipartInfo); + Assert.assertNotNull(multipartInfo); Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); Assert.assertEquals(bucketName, multipartInfo.getBucketName()); Assert.assertEquals(keyName, multipartInfo.getKeyName()); - assertNotEquals(multipartInfo.getUploadID(), uploadID); - assertNotNull(multipartInfo.getUploadID()); + Assert.assertNotEquals(multipartInfo.getUploadID(), uploadID); + Assert.assertNotNull(multipartInfo.getUploadID()); } @Test @@ -199,12 +209,12 @@ public class TestOzoneClientMultipartUploadV1 { OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, STAND_ALONE, ONE); - assertNotNull(multipartInfo); + Assert.assertNotNull(multipartInfo); String uploadID = multipartInfo.getUploadID(); Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); Assert.assertEquals(bucketName, multipartInfo.getBucketName()); Assert.assertEquals(keyName, multipartInfo.getKeyName()); - assertNotNull(multipartInfo.getUploadID()); + Assert.assertNotNull(multipartInfo.getUploadID()); OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName, sampleData.length(), 1, uploadID); @@ -214,8 +224,8 @@ public class TestOzoneClientMultipartUploadV1 { OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream .getCommitUploadPartInfo(); - assertNotNull(commitUploadPartInfo); - assertNotNull(commitUploadPartInfo.getPartName()); + Assert.assertNotNull(commitUploadPartInfo); + Assert.assertNotNull(commitUploadPartInfo.getPartName()); } @Test @@ -233,12 +243,12 @@ public class TestOzoneClientMultipartUploadV1 { OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, ReplicationType.RATIS, THREE); - assertNotNull(multipartInfo); + Assert.assertNotNull(multipartInfo); String uploadID = multipartInfo.getUploadID(); Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); Assert.assertEquals(bucketName, multipartInfo.getBucketName()); Assert.assertEquals(keyName, multipartInfo.getKeyName()); - assertNotNull(multipartInfo.getUploadID()); + Assert.assertNotNull(multipartInfo.getUploadID()); int partNumber = 1; @@ -250,9 +260,9 @@ public class TestOzoneClientMultipartUploadV1 { OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream .getCommitUploadPartInfo(); - assertNotNull(commitUploadPartInfo); + Assert.assertNotNull(commitUploadPartInfo); String partName = commitUploadPartInfo.getPartName(); - assertNotNull(commitUploadPartInfo.getPartName()); + Assert.assertNotNull(commitUploadPartInfo.getPartName()); //Overwrite the part by creating part key with same part number. sampleData = "sample Data Changed"; @@ -264,12 +274,230 @@ public class TestOzoneClientMultipartUploadV1 { commitUploadPartInfo = ozoneOutputStream .getCommitUploadPartInfo(); - assertNotNull(commitUploadPartInfo); - assertNotNull(commitUploadPartInfo.getPartName()); + Assert.assertNotNull(commitUploadPartInfo); + Assert.assertNotNull(commitUploadPartInfo.getPartName()); // PartName should be different from old part Name. - assertNotEquals("Part names should be different", partName, + Assert.assertNotEquals("Part names should be different", partName, commitUploadPartInfo.getPartName()); } + @Test + public void testMultipartUploadWithPartsLessThanMinSize() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + // Initiate multipart upload + String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE, + ONE); + + // Upload Parts + Map<Integer, String> partsMap = new TreeMap<>(); + // Uploading part 1 with less than min size + String partName = uploadPart(bucket, keyName, uploadID, 1, + "data".getBytes(UTF_8)); + partsMap.put(1, partName); + + partName = uploadPart(bucket, keyName, uploadID, 2, + "data".getBytes(UTF_8)); + partsMap.put(2, partName); + + // Complete multipart upload + OzoneTestUtils.expectOmException(OMException.ResultCodes.ENTITY_TOO_SMALL, + () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap)); + } + + @Test + public void testMultipartUploadWithPartsMisMatchWithListSizeDifferent() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE, + ONE); + + // We have not uploaded any parts, but passing some list it should throw + // error. + TreeMap<Integer, String> partsMap = new TreeMap<>(); + partsMap.put(1, UUID.randomUUID().toString()); + + OzoneTestUtils.expectOmException(OMException.ResultCodes.INVALID_PART, + () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap)); + } + + @Test + public void testMultipartUploadWithPartsMisMatchWithIncorrectPartName() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE, + ONE); + + uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(UTF_8)); + + // passing with an incorrect part name, should throw INVALID_PART error. + TreeMap<Integer, String> partsMap = new TreeMap<>(); + partsMap.put(1, UUID.randomUUID().toString()); + + OzoneTestUtils.expectOmException(OMException.ResultCodes.INVALID_PART, + () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap)); + } + + @Test + public void testMultipartUploadWithMissingParts() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE, + ONE); + + uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(UTF_8)); + + // passing with an incorrect part number, should throw INVALID_PART error. + TreeMap<Integer, String> partsMap = new TreeMap<>(); + partsMap.put(3, "random"); + + OzoneTestUtils.expectOmException(OMException.ResultCodes.INVALID_PART, + () -> completeMultipartUpload(bucket, keyName, uploadID, partsMap)); + } + + @Test + public void testCommitPartAfterCompleteUpload() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String parentDir = "a/b/c/d/"; + String keyName = parentDir + UUID.randomUUID().toString(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + OmMultipartInfo omMultipartInfo = bucket.initiateMultipartUpload(keyName, + STAND_ALONE, ONE); + + Assert.assertNotNull(omMultipartInfo.getUploadID()); + + String uploadID = omMultipartInfo.getUploadID(); + + // upload part 1. + byte[] data = generateData(5 * 1024 * 1024, + (byte) RandomUtils.nextLong()); + OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName, + data.length, 1, uploadID); + ozoneOutputStream.write(data, 0, data.length); + ozoneOutputStream.close(); + + OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo = + ozoneOutputStream.getCommitUploadPartInfo(); + + // Do not close output stream for part 2. + ozoneOutputStream = bucket.createMultipartKey(keyName, + data.length, 2, omMultipartInfo.getUploadID()); + ozoneOutputStream.write(data, 0, data.length); + + Map<Integer, String> partsMap = new LinkedHashMap<>(); + partsMap.put(1, omMultipartCommitUploadPartInfo.getPartName()); + OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = + bucket.completeMultipartUpload(keyName, + uploadID, partsMap); + Assert.assertNotNull(omMultipartUploadCompleteInfo); + + Assert.assertNotNull(omMultipartCommitUploadPartInfo); + + byte[] fileContent = new byte[data.length]; + OzoneInputStream inputStream = bucket.readKey(keyName); + inputStream.read(fileContent); + StringBuilder sb = new StringBuilder(data.length); + + // Combine all parts data, and check is it matching with get key data. + String part1 = new String(data, UTF_8); + sb.append(part1); + Assert.assertEquals(sb.toString(), new String(fileContent, UTF_8)); + + try { + ozoneOutputStream.close(); + Assert.fail("testCommitPartAfterCompleteUpload failed"); + } catch (IOException ex) { + Assert.assertTrue(ex instanceof OMException); + Assert.assertEquals(NO_SUCH_MULTIPART_UPLOAD_ERROR, + ((OMException) ex).getResult()); + } + } + + private String initiateMultipartUpload(OzoneBucket bucket, String keyName, + ReplicationType replicationType, ReplicationFactor replicationFactor) + throws Exception { + OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, + replicationType, replicationFactor); + + String uploadID = multipartInfo.getUploadID(); + Assert.assertNotNull(uploadID); + + return uploadID; + } + + private String uploadPart(OzoneBucket bucket, String keyName, String + uploadID, int partNumber, byte[] data) throws Exception { + + OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName, + data.length, partNumber, uploadID); + ozoneOutputStream.write(data, 0, + data.length); + ozoneOutputStream.close(); + + OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo = + ozoneOutputStream.getCommitUploadPartInfo(); + + Assert.assertNotNull(omMultipartCommitUploadPartInfo); + Assert.assertNotNull(omMultipartCommitUploadPartInfo.getPartName()); + + return omMultipartCommitUploadPartInfo.getPartName(); + } + + private void completeMultipartUpload(OzoneBucket bucket, String keyName, + String uploadID, Map<Integer, String> partsMap) throws Exception { + OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = bucket + .completeMultipartUpload(keyName, uploadID, partsMap); + + Assert.assertNotNull(omMultipartUploadCompleteInfo); + Assert.assertEquals(omMultipartUploadCompleteInfo.getBucket(), bucket + .getName()); + Assert.assertEquals(omMultipartUploadCompleteInfo.getVolume(), bucket + .getVolumeName()); + Assert.assertEquals(omMultipartUploadCompleteInfo.getKey(), keyName); + Assert.assertNotNull(omMultipartUploadCompleteInfo.getHash()); + } + + private byte[] generateData(int size, byte val) { + byte[] chars = new byte[size]; + Arrays.fill(chars, val); + return chars; + } + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 40a5396..2b39b8d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -62,6 +62,7 @@ import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortReq import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest; import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequestV1; import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest; +import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequestV1; import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest; import org.apache.hadoop.ozone.om.request.security.OMCancelDelegationTokenRequest; import org.apache.hadoop.ozone.om.request.security.OMGetDelegationTokenRequest; @@ -196,6 +197,9 @@ public final class OzoneManagerRatisUtils { case AbortMultiPartUpload: return new S3MultipartUploadAbortRequest(omRequest); case CompleteMultiPartUpload: + if (isBucketFSOptimized()) { + return new S3MultipartUploadCompleteRequestV1(omRequest); + } return new S3MultipartUploadCompleteRequest(omRequest); case AddAcl: case RemoveAcl: diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java index 7f2d2c5..ebf86ce 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java @@ -542,9 +542,10 @@ public final class OMFileRequest { * @param omMetadataMgr * @param batchOp * @param omFileInfo + * @return db file key * @throws IOException */ - public static void addToFileTable(OMMetadataManager omMetadataMgr, + public static String addToFileTable(OMMetadataManager omMetadataMgr, BatchOperation batchOp, OmKeyInfo omFileInfo) throws IOException { @@ -554,6 +555,7 @@ public final class OMFileRequest { omMetadataMgr.getKeyTable().putWithBatch(batchOp, dbFileKey, omFileInfo); + return dbFileKey; } /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java index 5546010..7aa21cf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestV1.java @@ -86,7 +86,8 @@ public class S3MultipartUploadCommitPartRequestV1 boolean acquiredLock = false; IOException exception = null; - String partName = null; + String dbPartName; + String fullKeyPartName = null; OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder( getOmRequest()); OMClientResponse omClientResponse = null; @@ -95,8 +96,8 @@ public class S3MultipartUploadCommitPartRequestV1 OmKeyInfo omKeyInfo = null; String multipartKey = null; OmMultipartKeyInfo multipartKeyInfo = null; - Result result = null; - OmBucketInfo omBucketInfo = null; + Result result; + OmBucketInfo omBucketInfo; OmBucketInfo copyBucketInfo = null; try { keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap); @@ -141,14 +142,24 @@ public class S3MultipartUploadCommitPartRequestV1 omKeyInfo.setDataSize(keyArgs.getDataSize()); omKeyInfo.updateLocationInfoList(keyArgs.getKeyLocationsList().stream() .map(OmKeyLocationInfo::getFromProtobuf) - .collect(Collectors.toList())); + .collect(Collectors.toList()), true); // Set Modification time omKeyInfo.setModificationTime(keyArgs.getModificationTime()); // Set the UpdateID to current transactionLogIndex omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled()); - String ozoneKey = omMetadataManager.getOzonePathKey(parentID, fileName); - partName = ozoneKey + clientID; + /** + * Format of PartName stored into MultipartInfoTable is, + * "fileName + ClientID". + * + * Contract is that all part names present in a multipart info will + * have same key prefix path. + * + * For example: + * /vol1/buck1/a/b/c/part-1, /vol1/buck1/a/b/c/part-2, + * /vol1/buck1/a/b/c/part-n + */ + dbPartName = fileName + clientID; if (multipartKeyInfo == null) { // This can occur when user started uploading part by the time commit @@ -168,9 +179,9 @@ public class S3MultipartUploadCommitPartRequestV1 // Build this multipart upload part info. OzoneManagerProtocolProtos.PartKeyInfo.Builder partKeyInfo = OzoneManagerProtocolProtos.PartKeyInfo.newBuilder(); - partKeyInfo.setPartName(partName); + partKeyInfo.setPartName(dbPartName); partKeyInfo.setPartNumber(partNumber); - partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf( + partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf(fileName, getOmRequest().getVersion())); // Add this part information in to multipartKeyInfo. @@ -207,9 +218,15 @@ public class S3MultipartUploadCommitPartRequestV1 keyArgs.getKeyLocationsList().size() * scmBlockSize * factor; omBucketInfo.incrUsedBytes(correctedSpace); + // Prepare response. Sets user given full key part name in 'partName' + // attribute in response object. + String fullOzoneKeyName = omMetadataManager.getOzoneKey( + volumeName, bucketName, keyName); + fullKeyPartName = fullOzoneKeyName + clientID; omResponse.setCommitMultiPartUploadResponse( MultipartCommitUploadPartResponse.newBuilder() - .setPartName(partName)); + .setPartName(fullKeyPartName)); + omClientResponse = new S3MultipartUploadCommitPartResponseV1( omResponse.build(), multipartKey, openKey, multipartKeyInfo, oldPartKeyInfo, omKeyInfo, @@ -234,8 +251,8 @@ public class S3MultipartUploadCommitPartRequestV1 } logResult(ozoneManager, multipartCommitUploadPartRequest, keyArgs, - auditMap, volumeName, bucketName, keyName, exception, partName, - result); + auditMap, volumeName, bucketName, keyName, exception, + fullKeyPartName, result); return omClientResponse; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index 1e4d25c..9480e97 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -18,14 +18,12 @@ package org.apache.hadoop.ozone.om.request.s3.multipart; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - +import com.google.common.base.Optional; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -35,7 +33,10 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil; +import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; import org.apache.hadoop.ozone.om.response.OMClientResponse; @@ -48,15 +49,19 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo; import org.apache.hadoop.util.Time; -import org.apache.hadoop.hdds.utils.db.cache.CacheKey; -import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; -import org.apache.commons.codec.digest.DigestUtils; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Handle Multipart upload complete request. @@ -168,129 +173,25 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest { } // First Check for Invalid Part Order. - int prevPartNumber = partsList.get(0).getPartNumber(); List< Integer > partNumbers = new ArrayList<>(); - int partsListSize = partsList.size(); - partNumbers.add(prevPartNumber); - for (int i = 1; i < partsListSize; i++) { - int currentPartNumber = partsList.get(i).getPartNumber(); - if (prevPartNumber >= currentPartNumber) { - LOG.error("PartNumber at index {} is {}, and its previous " + - "partNumber at index {} is {} for ozonekey is " + - "{}", i, currentPartNumber, i - 1, prevPartNumber, - ozoneKey); - throw new OMException( - failureMessage(requestedVolume, requestedBucket, keyName) + - " because parts are in Invalid order.", - OMException.ResultCodes.INVALID_PART_ORDER); - } - prevPartNumber = currentPartNumber; - partNumbers.add(prevPartNumber); - } - + int partsListSize = getPartsListSize(requestedVolume, + requestedBucket, keyName, ozoneKey, partNumbers, partsList); List<OmKeyLocationInfo> partLocationInfos = new ArrayList<>(); - long dataSize = 0; - int currentPartCount = 0; - // Now do actual logic, and check for any Invalid part during this. - for (OzoneManagerProtocolProtos.Part part : partsList) { - currentPartCount++; - int partNumber = part.getPartNumber(); - String partName = part.getPartName(); - - PartKeyInfo partKeyInfo = partKeyInfoMap.get(partNumber); - - if (partKeyInfo == null || - !partName.equals(partKeyInfo.getPartName())) { - String omPartName = partKeyInfo == null ? null : - partKeyInfo.getPartName(); - throw new OMException( - failureMessage(requestedVolume, requestedBucket, keyName) + - ". Provided Part info is { " + partName + ", " + partNumber + - "}, whereas OM has partName " + omPartName, - OMException.ResultCodes.INVALID_PART); - } - - OmKeyInfo currentPartKeyInfo = OmKeyInfo - .getFromProtobuf(partKeyInfo.getPartKeyInfo()); - - // Except for last part all parts should have minimum size. - if (currentPartCount != partsListSize) { - if (currentPartKeyInfo.getDataSize() < - ozoneManager.getMinMultipartUploadPartSize()) { - LOG.error("MultipartUpload: {} Part number: {} size {} is less" + - " than minimum part size {}", ozoneKey, - partKeyInfo.getPartNumber(), currentPartKeyInfo.getDataSize(), - ozoneManager.getMinMultipartUploadPartSize()); - throw new OMException( - failureMessage(requestedVolume, requestedBucket, keyName) + - ". Entity too small.", - OMException.ResultCodes.ENTITY_TOO_SMALL); - } - } - - // As all part keys will have only one version. - OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo - .getKeyLocationVersions().get(0); - - // Set partNumber in each block. - currentKeyInfoGroup.getLocationList().forEach( - omKeyLocationInfo -> omKeyLocationInfo.setPartNumber(partNumber)); - - partLocationInfos.addAll(currentKeyInfoGroup.getLocationList()); - dataSize += currentPartKeyInfo.getDataSize(); - } + long dataSize = getMultipartDataSize(requestedVolume, requestedBucket, + keyName, ozoneKey, partKeyInfoMap, partsListSize, + partLocationInfos, partsList, ozoneManager); // All parts have same replication information. Here getting from last // part. - HddsProtos.ReplicationType type = partKeyInfoMap.lastEntry().getValue() - .getPartKeyInfo().getType(); - HddsProtos.ReplicationFactor factor = - partKeyInfoMap.lastEntry().getValue().getPartKeyInfo().getFactor(); - - OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey); - if (omKeyInfo == null) { - // This is a newly added key, it does not have any versions. - OmKeyLocationInfoGroup keyLocationInfoGroup = new - OmKeyLocationInfoGroup(0, partLocationInfos, true); - - // Get the objectID of the key from OpenKeyTable - OmKeyInfo dbOpenKeyInfo = omMetadataManager.getOpenKeyTable() - .get(multipartKey); - - // A newly created key, this is the first version. - OmKeyInfo.Builder builder = - new OmKeyInfo.Builder().setVolumeName(volumeName) - .setBucketName(bucketName).setKeyName(keyName) - .setReplicationFactor(factor).setReplicationType(type) - .setCreationTime(keyArgs.getModificationTime()) - .setModificationTime(keyArgs.getModificationTime()) - .setDataSize(dataSize) - .setFileEncryptionInfo(dbOpenKeyInfo.getFileEncryptionInfo()) - .setOmKeyLocationInfos( - Collections.singletonList(keyLocationInfoGroup)) - .setAcls(dbOpenKeyInfo.getAcls()); - // Check if db entry has ObjectID. This check is required because - // it is possible that between multipart key uploads and complete, - // we had an upgrade. - if (dbOpenKeyInfo.getObjectID() != 0) { - builder.setObjectID(dbOpenKeyInfo.getObjectID()); - } - omKeyInfo = builder.build(); - } else { - // Already a version exists, so we should add it as a new version. - // But now as versioning is not supported, just following the commit - // key approach. When versioning support comes, then we can uncomment - // below code keyInfo.addNewVersion(locations); - omKeyInfo.updateLocationInfoList(partLocationInfos, true); - omKeyInfo.setModificationTime(keyArgs.getModificationTime()); - omKeyInfo.setDataSize(dataSize); - } - omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled()); + OmKeyInfo omKeyInfo = getOmKeyInfo(ozoneManager, trxnLogIndex, keyArgs, + volumeName, bucketName, keyName, multipartKey, + omMetadataManager, ozoneKey, partKeyInfoMap, partLocationInfos, + dataSize); //Find all unused parts. - List< OmKeyInfo > unUsedParts = new ArrayList<>(); - for (Map.Entry< Integer, PartKeyInfo > partKeyInfo : + List<OmKeyInfo> unUsedParts = new ArrayList<>(); + for (Map.Entry< Integer, PartKeyInfo> partKeyInfo : partKeyInfoMap.entrySet()) { if (!partNumbers.contains(partKeyInfo.getKey())) { unUsedParts.add(OmKeyInfo @@ -333,6 +234,19 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest { } } + logResult(ozoneManager, multipartUploadCompleteRequest, partsList, + auditMap, volumeName, bucketName, keyName, exception, result); + + return omClientResponse; + } + + @SuppressWarnings("checkstyle:ParameterNumber") + protected void logResult(OzoneManager ozoneManager, + MultipartUploadCompleteRequest multipartUploadCompleteRequest, + List<OzoneManagerProtocolProtos.Part> partsList, + Map<String, String> auditMap, String volumeName, + String bucketName, String keyName, IOException exception, + Result result) { auditMap.put(OzoneConsts.MULTIPART_LIST, partsList.toString()); // audit log @@ -355,8 +269,183 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest { LOG.error("Unrecognized Result for S3MultipartUploadCommitRequest: {}", multipartUploadCompleteRequest); } + } - return omClientResponse; + @SuppressWarnings("checkstyle:ParameterNumber") + protected OmKeyInfo getOmKeyInfo(OzoneManager ozoneManager, long trxnLogIndex, + KeyArgs keyArgs, String volumeName, String bucketName, String keyName, + String multipartKey, OMMetadataManager omMetadataManager, + String ozoneKey, TreeMap<Integer, PartKeyInfo> partKeyInfoMap, + List<OmKeyLocationInfo> partLocationInfos, long dataSize) + throws IOException { + HddsProtos.ReplicationType type = partKeyInfoMap.lastEntry().getValue() + .getPartKeyInfo().getType(); + HddsProtos.ReplicationFactor factor = + partKeyInfoMap.lastEntry().getValue().getPartKeyInfo().getFactor(); + + OmKeyInfo omKeyInfo = getOmKeyInfoFromKeyTable(ozoneKey, keyName, + omMetadataManager); + if (omKeyInfo == null) { + // This is a newly added key, it does not have any versions. + OmKeyLocationInfoGroup keyLocationInfoGroup = new + OmKeyLocationInfoGroup(0, partLocationInfos, true); + + // Get the objectID of the key from OpenKeyTable + OmKeyInfo dbOpenKeyInfo = getOmKeyInfoFromOpenKeyTable(multipartKey, + keyName, omMetadataManager); + + // A newly created key, this is the first version. + OmKeyInfo.Builder builder = + new OmKeyInfo.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setKeyName(dbOpenKeyInfo.getKeyName()) + .setReplicationFactor(factor).setReplicationType(type) + .setCreationTime(keyArgs.getModificationTime()) + .setModificationTime(keyArgs.getModificationTime()) + .setDataSize(dataSize) + .setFileEncryptionInfo(dbOpenKeyInfo.getFileEncryptionInfo()) + .setOmKeyLocationInfos( + Collections.singletonList(keyLocationInfoGroup)) + .setAcls(dbOpenKeyInfo.getAcls()); + // Check if db entry has ObjectID. This check is required because + // it is possible that between multipart key uploads and complete, + // we had an upgrade. + if (dbOpenKeyInfo.getObjectID() != 0) { + builder.setObjectID(dbOpenKeyInfo.getObjectID()); + } + updatePrefixFSOInfo(dbOpenKeyInfo, builder); + omKeyInfo = builder.build(); + } else { + // Already a version exists, so we should add it as a new version. + // But now as versioning is not supported, just following the commit + // key approach. When versioning support comes, then we can uncomment + // below code keyInfo.addNewVersion(locations); + omKeyInfo.updateLocationInfoList(partLocationInfos, true); + omKeyInfo.setModificationTime(keyArgs.getModificationTime()); + omKeyInfo.setDataSize(dataSize); + } + omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled()); + return omKeyInfo; + } + + protected void updatePrefixFSOInfo(OmKeyInfo dbOpenKeyInfo, + OmKeyInfo.Builder builder) { + // FSOBucket is disabled. Do nothing. + } + + protected OmKeyInfo getOmKeyInfoFromKeyTable(String dbOzoneKey, + String keyName, OMMetadataManager omMetadataManager) throws IOException { + return omMetadataManager.getKeyTable().get(dbOzoneKey); + } + + protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey, + String keyName, OMMetadataManager omMetadataManager) throws IOException { + return omMetadataManager.getOpenKeyTable().get(dbMultipartKey); + } + + protected int getPartsListSize(String requestedVolume, + String requestedBucket, String keyName, String ozoneKey, + List<Integer> partNumbers, + List<OzoneManagerProtocolProtos.Part> partsList) throws OMException { + int prevPartNumber = partsList.get(0).getPartNumber(); + int partsListSize = partsList.size(); + partNumbers.add(prevPartNumber); + for (int i = 1; i < partsListSize; i++) { + int currentPartNumber = partsList.get(i).getPartNumber(); + if (prevPartNumber >= currentPartNumber) { + LOG.error("PartNumber at index {} is {}, and its previous " + + "partNumber at index {} is {} for ozonekey is " + + "{}", i, currentPartNumber, i - 1, prevPartNumber, + ozoneKey); + throw new OMException( + failureMessage(requestedVolume, requestedBucket, keyName) + + " because parts are in Invalid order.", + OMException.ResultCodes.INVALID_PART_ORDER); + } + prevPartNumber = currentPartNumber; + partNumbers.add(prevPartNumber); + } + return partsListSize; + } + + @SuppressWarnings("checkstyle:ParameterNumber") + protected long getMultipartDataSize(String requestedVolume, + String requestedBucket, String keyName, String ozoneKey, + TreeMap<Integer, PartKeyInfo> partKeyInfoMap, + int partsListSize, List<OmKeyLocationInfo> partLocationInfos, + List<OzoneManagerProtocolProtos.Part> partsList, + OzoneManager ozoneManager) throws OMException { + long dataSize = 0; + int currentPartCount = 0; + // Now do actual logic, and check for any Invalid part during this. + for (OzoneManagerProtocolProtos.Part part : partsList) { + currentPartCount++; + int partNumber = part.getPartNumber(); + String partName = part.getPartName(); + + PartKeyInfo partKeyInfo = partKeyInfoMap.get(partNumber); + + String dbPartName = null; + if (partKeyInfo != null) { + dbPartName = preparePartName(requestedVolume, requestedBucket, keyName, + partKeyInfo, ozoneManager.getMetadataManager()); + } + if (!StringUtils.equals(partName, dbPartName)) { + String omPartName = partKeyInfo == null ? null : dbPartName; + throw new OMException( + failureMessage(requestedVolume, requestedBucket, keyName) + + ". Provided Part info is { " + partName + ", " + partNumber + + "}, whereas OM has partName " + omPartName, + OMException.ResultCodes.INVALID_PART); + } + + OmKeyInfo currentPartKeyInfo = OmKeyInfo + .getFromProtobuf(partKeyInfo.getPartKeyInfo()); + + // Except for last part all parts should have minimum size. + if (currentPartCount != partsListSize) { + if (currentPartKeyInfo.getDataSize() < + ozoneManager.getMinMultipartUploadPartSize()) { + LOG.error("MultipartUpload: {} Part number: {} size {} is less" + + " than minimum part size {}", ozoneKey, + partKeyInfo.getPartNumber(), currentPartKeyInfo.getDataSize(), + ozoneManager.getMinMultipartUploadPartSize()); + throw new OMException( + failureMessage(requestedVolume, requestedBucket, keyName) + + ". Entity too small.", + OMException.ResultCodes.ENTITY_TOO_SMALL); + } + } + + // As all part keys will have only one version. + OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo + .getKeyLocationVersions().get(0); + + // Set partNumber in each block. + currentKeyInfoGroup.getLocationList().forEach( + omKeyLocationInfo -> omKeyLocationInfo.setPartNumber(partNumber)); + + partLocationInfos.addAll(currentKeyInfoGroup.getLocationList()); + dataSize += currentPartKeyInfo.getDataSize(); + } + return dataSize; + } + + private String preparePartName(String requestedVolume, + String requestedBucket, String keyName, PartKeyInfo partKeyInfo, + OMMetadataManager omMetadataManager) { + + String partName; + if (OzoneManagerRatisUtils.isBucketFSOptimized()) { + String parentPath = OzoneFSUtils.getParent(keyName); + StringBuffer keyPath = new StringBuffer(parentPath); + keyPath.append(partKeyInfo.getPartName()); + + partName = omMetadataManager.getOzoneKey(requestedVolume, + requestedBucket, keyPath.toString()); + } else { + partName = partKeyInfo.getPartName(); + } + return partName; } private static String failureMessage(String volume, String bucket, @@ -365,7 +454,7 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest { volume + " bucket: " + bucket + " key: " + keyName; } - private void updateCache(OMMetadataManager omMetadataManager, + protected void updateCache(OMMetadataManager omMetadataManager, String ozoneKey, String multipartKey, OmKeyInfo omKeyInfo, long transactionLogIndex) { // Update cache. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestV1.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestV1.java new file mode 100644 index 0000000..4ab9ee7 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestV1.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.request.s3.multipart; + +import com.google.common.base.Optional; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper; +import org.apache.hadoop.ozone.om.request.file.OMFileRequest; +import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadCompleteResponseV1; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; +import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS; + +/** + * Handle Multipart upload complete request. + */ +public class S3MultipartUploadCompleteRequestV1 + extends S3MultipartUploadCompleteRequest { + + private static final Logger LOG = + LoggerFactory.getLogger(S3MultipartUploadCompleteRequestV1.class); + + public S3MultipartUploadCompleteRequestV1(OMRequest omRequest) { + super(omRequest); + } + + @Override + @SuppressWarnings("methodlength") + public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, + long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) { + MultipartUploadCompleteRequest multipartUploadCompleteRequest = + getOmRequest().getCompleteMultiPartUploadRequest(); + + KeyArgs keyArgs = multipartUploadCompleteRequest.getKeyArgs(); + + List<OzoneManagerProtocolProtos.Part> partsList = + multipartUploadCompleteRequest.getPartsListList(); + Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs); + + String volumeName = keyArgs.getVolumeName(); + String bucketName = keyArgs.getBucketName(); + final String requestedVolume = volumeName; + final String requestedBucket = bucketName; + String keyName = keyArgs.getKeyName(); + String uploadID = keyArgs.getMultipartUploadID(); + String dbMultipartKey; + + ozoneManager.getMetrics().incNumCompleteMultipartUploads(); + + OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); + + boolean acquiredLock = false; + OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder( + getOmRequest()); + OMClientResponse omClientResponse = null; + IOException exception = null; + Result result; + try { + keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap); + volumeName = keyArgs.getVolumeName(); + bucketName = keyArgs.getBucketName(); + + acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, + volumeName, bucketName); + + validateBucketAndVolume(omMetadataManager, volumeName, bucketName); + + String fileName = OzoneFSUtils.getFileName(keyName); + Path keyPath = Paths.get(keyName); + OMFileRequest.OMPathInfoV1 pathInfoV1 = + OMFileRequest.verifyDirectoryKeysInPath(omMetadataManager, + volumeName, bucketName, keyName, keyPath); + long parentID = pathInfoV1.getLastKnownParentId(); + + dbMultipartKey = omMetadataManager.getMultipartKey(parentID, + fileName, uploadID); + + String dbOzoneKey = omMetadataManager.getOzonePathKey(parentID, fileName); + + String ozoneKey = omMetadataManager.getOzoneKey( + volumeName, bucketName, keyName); + + OmMultipartKeyInfo multipartKeyInfo = + omMetadataManager.getMultipartInfoTable().get(dbMultipartKey); + + // Check for directory exists with same name, if it exists throw error. + if (pathInfoV1.getDirectoryResult() == DIRECTORY_EXISTS) { + throw new OMException("Can not Complete MPU for file: " + keyName + + " as there is already directory in the given path", + NOT_A_FILE); + } + + if (multipartKeyInfo == null) { + throw new OMException( + failureMessage(requestedVolume, requestedBucket, keyName), + OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); + } + TreeMap<Integer, PartKeyInfo> partKeyInfoMap = + multipartKeyInfo.getPartKeyInfoMap(); + + if (partsList.size() > 0) { + if (partKeyInfoMap.size() == 0) { + LOG.error("Complete MultipartUpload failed for key {} , MPU Key has" + + " no parts in OM, parts given to upload are {}", ozoneKey, + partsList); + throw new OMException( + failureMessage(requestedVolume, requestedBucket, keyName), + OMException.ResultCodes.INVALID_PART); + } + + // First Check for Invalid Part Order. + List< Integer > partNumbers = new ArrayList<>(); + int partsListSize = getPartsListSize(requestedVolume, + requestedBucket, keyName, ozoneKey, partNumbers, partsList); + + List<OmKeyLocationInfo> partLocationInfos = new ArrayList<>(); + long dataSize = getMultipartDataSize(requestedVolume, requestedBucket, + keyName, ozoneKey, partKeyInfoMap, partsListSize, + partLocationInfos, partsList, ozoneManager); + + // All parts have same replication information. Here getting from last + // part. + OmKeyInfo omKeyInfo = getOmKeyInfo(ozoneManager, trxnLogIndex, keyArgs, + volumeName, bucketName, keyName, dbMultipartKey, + omMetadataManager, dbOzoneKey, partKeyInfoMap, + partLocationInfos, dataSize); + + //Find all unused parts. + List< OmKeyInfo > unUsedParts = new ArrayList<>(); + for (Map.Entry< Integer, PartKeyInfo > partKeyInfo : + partKeyInfoMap.entrySet()) { + if (!partNumbers.contains(partKeyInfo.getKey())) { + unUsedParts.add(OmKeyInfo + .getFromProtobuf(partKeyInfo.getValue().getPartKeyInfo())); + } + } + + updateCache(omMetadataManager, dbOzoneKey, dbMultipartKey, omKeyInfo, + trxnLogIndex); + + omResponse.setCompleteMultiPartUploadResponse( + MultipartUploadCompleteResponse.newBuilder() + .setVolume(requestedVolume) + .setBucket(requestedBucket) + .setKey(keyName) + .setHash(DigestUtils.sha256Hex(keyName))); + + omClientResponse = new S3MultipartUploadCompleteResponseV1( + omResponse.build(), dbMultipartKey, omKeyInfo, unUsedParts); + + result = Result.SUCCESS; + } else { + throw new OMException( + failureMessage(requestedVolume, requestedBucket, keyName) + + " because of empty part list", + OMException.ResultCodes.INVALID_REQUEST); + } + + } catch (IOException ex) { + result = Result.FAILURE; + exception = ex; + omClientResponse = new S3MultipartUploadCompleteResponseV1( + createErrorOMResponse(omResponse, exception)); + } finally { + addResponseToDoubleBuffer(trxnLogIndex, omClientResponse, + omDoubleBufferHelper); + if (acquiredLock) { + omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, + volumeName, bucketName); + } + } + + logResult(ozoneManager, multipartUploadCompleteRequest, partsList, + auditMap, volumeName, bucketName, keyName, exception, result); + + return omClientResponse; + } + + protected OmKeyInfo getOmKeyInfoFromKeyTable(String dbOzoneFileKey, + String keyName, OMMetadataManager omMetadataManager) throws IOException { + return OMFileRequest.getOmKeyInfoFromFileTable(true, + omMetadataManager, dbOzoneFileKey, keyName); + } + + @Override + protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey, + String keyName, OMMetadataManager omMetadataManager) throws IOException { + return OMFileRequest.getOmKeyInfoFromFileTable(true, + omMetadataManager, dbMultipartKey, keyName); + } + + @Override + protected void updateCache(OMMetadataManager omMetadataManager, + String ozoneKey, String multipartKey, OmKeyInfo omKeyInfo, + long transactionLogIndex) { + // Update cache. + // 1. Add key entry to key table. + // 2. Delete multipartKey entry from openKeyTable and multipartInfo table. + OMFileRequest.addFileTableCacheEntry(omMetadataManager, ozoneKey, + omKeyInfo, omKeyInfo.getFileName(), transactionLogIndex); + + omMetadataManager.getOpenKeyTable().addCacheEntry( + new CacheKey<>(multipartKey), + new CacheValue<>(Optional.absent(), transactionLogIndex)); + omMetadataManager.getMultipartInfoTable().addCacheEntry( + new CacheKey<>(multipartKey), + new CacheValue<>(Optional.absent(), transactionLogIndex)); + } + + protected void updatePrefixFSOInfo(OmKeyInfo dbOpenKeyInfo, + OmKeyInfo.Builder builder) { + // updates parentID and fileName + builder.setParentObjectID(dbOpenKeyInfo.getParentObjectID()); + builder.setFileName(dbOpenKeyInfo.getFileName()); + } + + private static String failureMessage(String volume, String bucket, + String keyName) { + return "Complete Multipart Upload Failed: volume: " + + volume + " bucket: " + bucket + " key: " + keyName; + } +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java index 20e398e..f593885 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java @@ -96,4 +96,15 @@ public class S3MultipartUploadCompleteResponse extends OMClientResponse { } } -} \ No newline at end of file + protected String getMultipartKey() { + return multipartKey; + } + + protected OmKeyInfo getOmKeyInfo() { + return omKeyInfo; + } + + protected List<OmKeyInfo> getPartsUnusedList() { + return partsUnusedList; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseV1.java similarity index 66% copy from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java copy to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseV1.java index 20e398e..bb31dce 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseV1.java @@ -18,82 +18,74 @@ package org.apache.hadoop.ozone.om.response.s3.multipart; -import java.io.IOException; -import java.util.List; - +import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; -import org.apache.hadoop.ozone.om.response.OMClientResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMResponse; -import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.List; import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE; -import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE; -import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTINFO_TABLE; -import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTFILEINFO_TABLE; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE; /** * Response for Multipart Upload Complete request. */ -@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, KEY_TABLE, DELETED_TABLE, - MULTIPARTINFO_TABLE}) -public class S3MultipartUploadCompleteResponse extends OMClientResponse { - private String multipartKey; - private OmKeyInfo omKeyInfo; - private List<OmKeyInfo> partsUnusedList; +@CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE, FILE_TABLE, DELETED_TABLE, + MULTIPARTFILEINFO_TABLE}) +public class S3MultipartUploadCompleteResponseV1 + extends S3MultipartUploadCompleteResponse { - public S3MultipartUploadCompleteResponse( + public S3MultipartUploadCompleteResponseV1( @Nonnull OMResponse omResponse, @Nonnull String multipartKey, @Nonnull OmKeyInfo omKeyInfo, @Nonnull List<OmKeyInfo> unUsedParts) { - super(omResponse); - this.partsUnusedList = unUsedParts; - this.multipartKey = multipartKey; - this.omKeyInfo = omKeyInfo; + super(omResponse, multipartKey, omKeyInfo, unUsedParts); } /** * For when the request is not successful. * For a successful request, the other constructor should be used. */ - public S3MultipartUploadCompleteResponse(@Nonnull OMResponse omResponse) { + public S3MultipartUploadCompleteResponseV1(@Nonnull OMResponse omResponse) { super(omResponse); checkStatusNotOK(); } + @Override public void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation batchOperation) throws IOException { omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation, - multipartKey); + getMultipartKey()); omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation, - multipartKey); + getMultipartKey()); - String ozoneKey = omMetadataManager.getOzoneKey(omKeyInfo.getVolumeName(), - omKeyInfo.getBucketName(), omKeyInfo.getKeyName()); - omMetadataManager.getKeyTable().putWithBatch(batchOperation, ozoneKey, - omKeyInfo); + String dbFileKey = OMFileRequest.addToFileTable(omMetadataManager, + batchOperation, getOmKeyInfo()); - if (!partsUnusedList.isEmpty()) { + if (!getPartsUnusedList().isEmpty()) { // Add unused parts to deleted key table. RepeatedOmKeyInfo repeatedOmKeyInfo = omMetadataManager.getDeletedTable() - .get(ozoneKey); + .get(dbFileKey); if (repeatedOmKeyInfo == null) { - repeatedOmKeyInfo = new RepeatedOmKeyInfo(partsUnusedList); + repeatedOmKeyInfo = new RepeatedOmKeyInfo(getPartsUnusedList()); } else { - repeatedOmKeyInfo.addOmKeyInfo(omKeyInfo); + repeatedOmKeyInfo.addOmKeyInfo(getOmKeyInfo()); } omMetadataManager.getDeletedTable().putWithBatch(batchOperation, - ozoneKey, repeatedOmKeyInfo); + dbFileKey, repeatedOmKeyInfo); } } +} -} \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java index 9f6cff8..16cb4ae 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java @@ -115,7 +115,7 @@ public class TestS3MultipartRequest { keyName); S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = - new S3InitiateMultipartUploadRequest(omRequest); + getS3InitiateMultipartUploadReq(omRequest); OMRequest modifiedRequest = s3InitiateMultipartUploadRequest.preExecute(ozoneManager); @@ -204,7 +204,7 @@ public class TestS3MultipartRequest { keyName, multipartUploadID, partList); S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = - new S3MultipartUploadCompleteRequest(omRequest); + getS3MultipartUploadCompleteReq(omRequest); OMRequest modifiedRequest = s3MultipartUploadCompleteRequest.preExecute(ozoneManager); @@ -247,6 +247,11 @@ public class TestS3MultipartRequest { return modifiedRequest; } + protected S3MultipartUploadCompleteRequest getS3MultipartUploadCompleteReq( + OMRequest omRequest) { + return new S3MultipartUploadCompleteRequest(omRequest); + } + protected S3MultipartUploadCommitPartRequest getS3MultipartUploadCommitReq( OMRequest omRequest) { return new S3MultipartUploadCommitPartRequest(omRequest); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java index a04f51f..3d399b1 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om.request.s3.multipart; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -55,7 +56,7 @@ public class TestS3MultipartUploadCompleteRequest public void testValidateAndUpdateCacheSuccess() throws Exception { String volumeName = UUID.randomUUID().toString(); String bucketName = UUID.randomUUID().toString(); - String keyName = UUID.randomUUID().toString(); + String keyName = getKeyName(); TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, omMetadataManager); @@ -64,7 +65,7 @@ public class TestS3MultipartUploadCompleteRequest bucketName, keyName); S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = - new S3InitiateMultipartUploadRequest(initiateMPURequest); + getS3InitiateMultipartUploadReq(initiateMPURequest); OMClientResponse omClientResponse = s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager, @@ -78,27 +79,25 @@ public class TestS3MultipartUploadCompleteRequest bucketName, keyName, clientID, multipartUploadID, 1); S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest = - new S3MultipartUploadCommitPartRequest(commitMultipartRequest); + getS3MultipartUploadCommitReq(commitMultipartRequest); // Add key to open key table. - TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, - keyName, clientID, HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.ONE, omMetadataManager); + addKeyToTable(volumeName, bucketName, keyName, clientID); s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager, 2L, ozoneManagerDoubleBufferHelper); List<Part> partList = new ArrayList<>(); - partList.add(Part.newBuilder().setPartName( - omMetadataManager.getOzoneKey(volumeName, bucketName, keyName) + - clientID).setPartNumber(1).build()); + String partName = getPartName(volumeName, bucketName, keyName, clientID); + partList.add(Part.newBuilder().setPartName(partName).setPartNumber(1) + .build()); OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName, bucketName, keyName, multipartUploadID, partList); S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = - new S3MultipartUploadCompleteRequest(completeMultipartRequest); + getS3MultipartUploadCompleteReq(completeMultipartRequest); omClientResponse = s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager, @@ -107,14 +106,71 @@ public class TestS3MultipartUploadCompleteRequest Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK, omClientResponse.getOMResponse().getStatus()); - String multipartKey = omMetadataManager.getMultipartKey(volumeName, - bucketName, keyName, multipartUploadID); + String multipartKey = getMultipartKey(volumeName, bucketName, keyName, + multipartUploadID); Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey)); Assert.assertNull( omMetadataManager.getMultipartInfoTable().get(multipartKey)); Assert.assertNotNull(omMetadataManager.getKeyTable().get( - omMetadataManager.getOzoneKey(volumeName, bucketName, keyName))); + getOzoneDBKey(volumeName, bucketName, keyName))); + } + + @Test + public void testInvalidPartOrderError() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager); + + OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName, + bucketName, keyName); + + S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest = + getS3InitiateMultipartUploadReq(initiateMPURequest); + + OMClientResponse omClientResponse = + s3InitiateMultipartUploadRequest.validateAndUpdateCache( + ozoneManager, 1L, ozoneManagerDoubleBufferHelper); + + long clientID = Time.now(); + String multipartUploadID = omClientResponse.getOMResponse() + .getInitiateMultiPartUploadResponse().getMultipartUploadID(); + + OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName, + bucketName, keyName, clientID, multipartUploadID, 1); + + S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest = + getS3MultipartUploadCommitReq(commitMultipartRequest); + + // Add key to open key table. + addKeyToTable(volumeName, bucketName, keyName, clientID); + + s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager, + 2L, ozoneManagerDoubleBufferHelper); + + List<Part> partList = new ArrayList<>(); + + String partName = getPartName(volumeName, bucketName, keyName, clientID); + partList.add(Part.newBuilder().setPartName(partName).setPartNumber(23) + .build()); + partList.add(Part.newBuilder().setPartName(partName).setPartNumber(1) + .build()); + + OMRequest completeMultipartRequest = doPreExecuteCompleteMPU(volumeName, + bucketName, keyName, multipartUploadID, partList); + + S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = + getS3MultipartUploadCompleteReq(completeMultipartRequest); + + omClientResponse = + s3MultipartUploadCompleteRequest.validateAndUpdateCache( + ozoneManager, 3L, ozoneManagerDoubleBufferHelper); + + Assert.assertEquals(OzoneManagerProtocolProtos.Status.INVALID_PART_ORDER, + omClientResponse.getOMResponse().getStatus()); } @Test @@ -129,7 +185,7 @@ public class TestS3MultipartUploadCompleteRequest bucketName, keyName, UUID.randomUUID().toString(), partList); S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = - new S3MultipartUploadCompleteRequest(completeMultipartRequest); + getS3MultipartUploadCompleteReq(completeMultipartRequest); OMClientResponse omClientResponse = s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager, @@ -153,7 +209,7 @@ public class TestS3MultipartUploadCompleteRequest bucketName, keyName, UUID.randomUUID().toString(), partList); S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = - new S3MultipartUploadCompleteRequest(completeMultipartRequest); + getS3MultipartUploadCompleteReq(completeMultipartRequest); OMClientResponse omClientResponse = s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager, @@ -180,7 +236,7 @@ public class TestS3MultipartUploadCompleteRequest // Doing complete multipart upload request with out initiate. S3MultipartUploadCompleteRequest s3MultipartUploadCompleteRequest = - new S3MultipartUploadCompleteRequest(completeMultipartRequest); + getS3MultipartUploadCompleteReq(completeMultipartRequest); OMClientResponse omClientResponse = s3MultipartUploadCompleteRequest.validateAndUpdateCache(ozoneManager, @@ -191,5 +247,35 @@ public class TestS3MultipartUploadCompleteRequest omClientResponse.getOMResponse().getStatus()); } + + protected void addKeyToTable(String volumeName, String bucketName, + String keyName, long clientID) throws Exception { + TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, + keyName, clientID, HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, omMetadataManager); + } + + protected String getMultipartKey(String volumeName, String bucketName, + String keyName, String multipartUploadID) throws IOException { + return omMetadataManager.getMultipartKey(volumeName, + bucketName, keyName, multipartUploadID); + } + + private String getPartName(String volumeName, String bucketName, + String keyName, long clientID) throws IOException { + + String dbOzoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName, + keyName); + return dbOzoneKey + clientID; + } + + protected String getOzoneDBKey(String volumeName, String bucketName, + String keyName) throws IOException { + return omMetadataManager.getOzoneKey(volumeName, bucketName, keyName); + } + + protected String getKeyName() { + return UUID.randomUUID().toString(); + } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestV1.java new file mode 100644 index 0000000..cd5051f --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestV1.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.request.s3.multipart; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils; +import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; +import org.apache.hadoop.ozone.om.request.file.OMFileRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Iterator; +import java.util.UUID; + +/** + * Tests S3 Multipart Upload Complete request. + */ +public class TestS3MultipartUploadCompleteRequestV1 + extends TestS3MultipartUploadCompleteRequest { + + @BeforeClass + public static void init() { + OzoneManagerRatisUtils.setBucketFSOptimized(true); + } + + protected String getKeyName() { + String parentDir = UUID.randomUUID().toString() + "/a/b/c"; + String fileName = "file1"; + String keyName = parentDir + OzoneConsts.OM_KEY_PREFIX + fileName; + return keyName; + } + + protected void addKeyToTable(String volumeName, String bucketName, + String keyName, long clientID) throws Exception { + // need to initialize parentID + String parentDir = OzoneFSUtils.getParentDir(keyName); + Assert.assertNotEquals("Parent doesn't exists!", parentDir, keyName); + + // add parentDir to dirTable + long parentID = getParentID(volumeName, bucketName, keyName); + long txnId = 50; + long objectId = parentID + 1; + + OmKeyInfo omKeyInfoV1 = + TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName, + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, objectId, parentID, txnId, + Time.now()); + + // add key to openFileTable + String fileName = OzoneFSUtils.getFileName(keyName); + omKeyInfoV1.setKeyName(fileName); + TestOMRequestUtils.addFileToKeyTable(true, false, + fileName, omKeyInfoV1, clientID, omKeyInfoV1.getObjectID(), + omMetadataManager); + } + + protected String getMultipartKey(String volumeName, String bucketName, + String keyName, String multipartUploadID) throws IOException { + OzoneFileStatus keyStatus = OMFileRequest.getOMKeyInfoIfExists( + omMetadataManager, volumeName, + bucketName, keyName, 0); + + Assert.assertNotNull("key not found in DB!", keyStatus); + + return omMetadataManager.getMultipartKey(keyStatus.getKeyInfo() + .getParentObjectID(), keyStatus.getTrimmedName(), + multipartUploadID); + } + + private long getParentID(String volumeName, String bucketName, + String keyName) throws IOException { + Path keyPath = Paths.get(keyName); + Iterator<Path> elements = keyPath.iterator(); + String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName); + OmBucketInfo omBucketInfo = + omMetadataManager.getBucketTable().get(bucketKey); + + return OMFileRequest.getParentID(omBucketInfo.getObjectID(), + elements, keyName, omMetadataManager); + } + + protected String getOzoneDBKey(String volumeName, String bucketName, + String keyName) throws IOException { + long parentID = getParentID(volumeName, bucketName, keyName); + String fileName = OzoneFSUtils.getFileName(keyName); + return omMetadataManager.getOzonePathKey(parentID, fileName); + } + + protected S3MultipartUploadCompleteRequest getS3MultipartUploadCompleteReq( + OMRequest omRequest) { + return new S3MultipartUploadCompleteRequestV1(omRequest); + } + + protected S3MultipartUploadCommitPartRequest getS3MultipartUploadCommitReq( + OMRequest omRequest) { + return new S3MultipartUploadCommitPartRequestV1(omRequest); + } + + protected S3InitiateMultipartUploadRequest getS3InitiateMultipartUploadReq( + OMRequest initiateMPURequest) { + return new S3InitiateMultipartUploadRequestV1(initiateMPURequest); + } + +} + diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java index 106ae61..6f4d6fa 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java @@ -268,6 +268,27 @@ public class TestS3MultipartResponse { openPartKeyInfoToBeDeleted, isRatisEnabled, omBucketInfo); } + @SuppressWarnings("checkstyle:ParameterNumber") + public S3MultipartUploadCompleteResponse createS3CompleteMPUResponseV1( + String volumeName, String bucketName, long parentID, String keyName, + String multipartUploadID, OmKeyInfo omKeyInfo, + OzoneManagerProtocolProtos.Status status, + List<OmKeyInfo> unUsedParts) { + + String multipartKey = getMultipartKey(parentID, keyName, multipartUploadID); + + OMResponse omResponse = OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.CompleteMultiPartUpload) + .setStatus(status).setSuccess(true) + .setCompleteMultiPartUploadResponse( + OzoneManagerProtocolProtos.MultipartUploadCompleteResponse + .newBuilder().setBucket(bucketName) + .setVolume(volumeName).setKey(keyName)).build(); + + return new S3MultipartUploadCompleteResponseV1(omResponse, multipartKey, + omKeyInfo, unUsedParts); + } + private String getMultipartKey(long parentID, String keyName, String multipartUploadID) { String fileName = OzoneFSUtils.getFileName(keyName); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseV1.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseV1.java new file mode 100644 index 0000000..2683273 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadCompleteResponseV1.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response.s3.multipart; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils; +import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Test multipart upload complete response. + */ +public class TestS3MultipartUploadCompleteResponseV1 + extends TestS3MultipartResponse { + + private String dirName = "a/b/c/"; + + private long parentID; + + @Test + public void testAddDBToBatch() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + String multipartUploadID = UUID.randomUUID().toString(); + + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager); + + long txnId = 50; + long objectId = parentID + 1; + String fileName = OzoneFSUtils.getFileName(keyName); + String dbMultipartKey = omMetadataManager.getMultipartKey(parentID, + fileName, multipartUploadID); + long clientId = Time.now(); + String dbOpenKey = omMetadataManager.getOpenFileName(parentID, fileName, + clientId); + String dbKey = omMetadataManager.getOzonePathKey(parentID, fileName); + OmKeyInfo omKeyInfoV1 = + TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName, + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, objectId, parentID, txnId, + Time.now()); + + // add key to openFileTable + omKeyInfoV1.setKeyName(fileName); + TestOMRequestUtils.addFileToKeyTable(true, false, + fileName, omKeyInfoV1, clientId, omKeyInfoV1.getObjectID(), + omMetadataManager); + + addS3MultipartUploadCommitPartResponseV1(volumeName, bucketName, keyName, + multipartUploadID, dbOpenKey); + + List<OmKeyInfo> unUsedParts = new ArrayList<>(); + S3MultipartUploadCompleteResponse s3MultipartUploadCompleteResponse = + createS3CompleteMPUResponseV1(volumeName, bucketName, parentID, + keyName, multipartUploadID, omKeyInfoV1, + OzoneManagerProtocolProtos.Status.OK, unUsedParts); + + s3MultipartUploadCompleteResponse.addToDBBatch(omMetadataManager, + batchOperation); + + omMetadataManager.getStore().commitBatchOperation(batchOperation); + + Assert.assertNotNull(omMetadataManager.getKeyTable().get(dbKey)); + Assert.assertNull( + omMetadataManager.getMultipartInfoTable().get(dbMultipartKey)); + Assert.assertNull( + omMetadataManager.getOpenKeyTable().get(dbMultipartKey)); + + // As no parts are created, so no entries should be there in delete table. + Assert.assertEquals(0, omMetadataManager.countRowsInTable( + omMetadataManager.getDeletedTable())); + } + + @Test + public void testAddDBToBatchWithParts() throws Exception { + + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = getKeyName(); + + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager); + createParentPath(volumeName, bucketName); + + String multipartUploadID = UUID.randomUUID().toString(); + + int deleteEntryCount = 0; + + String fileName = OzoneFSUtils.getFileName(keyName); + String dbMultipartKey = omMetadataManager.getMultipartKey(parentID, + fileName, multipartUploadID); + + S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponseV1 = + addS3InitiateMultipartUpload(volumeName, bucketName, keyName, + multipartUploadID); + + // Add some dummy parts for testing. + // Not added any key locations, as this just test is to see entries are + // adding to delete table or not. + OmMultipartKeyInfo omMultipartKeyInfo = + s3InitiateMultipartUploadResponseV1.getOmMultipartKeyInfo(); + + OmKeyInfo omKeyInfoV1 = commitS3MultipartUpload(volumeName, bucketName, + keyName, multipartUploadID, fileName, dbMultipartKey, + omMultipartKeyInfo); + // After commits, it adds an entry to the deleted table. + deleteEntryCount++; + + OmKeyInfo omKeyInfo = + TestOMRequestUtils.createOmKeyInfo(volumeName, bucketName, keyName, + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, + parentID + 10, + parentID, 100, Time.now()); + List<OmKeyInfo> unUsedParts = new ArrayList<>(); + unUsedParts.add(omKeyInfo); + S3MultipartUploadCompleteResponse s3MultipartUploadCompleteResponse = + createS3CompleteMPUResponseV1(volumeName, bucketName, parentID, + keyName, multipartUploadID, omKeyInfoV1, + OzoneManagerProtocolProtos.Status.OK, unUsedParts); + + s3MultipartUploadCompleteResponse.addToDBBatch(omMetadataManager, + batchOperation); + + omMetadataManager.getStore().commitBatchOperation(batchOperation); + String dbKey = omMetadataManager.getOzonePathKey(parentID, + omKeyInfoV1.getFileName()); + Assert.assertNotNull(omMetadataManager.getKeyTable().get(dbKey)); + Assert.assertNull( + omMetadataManager.getMultipartInfoTable().get(dbMultipartKey)); + Assert.assertNull( + omMetadataManager.getOpenKeyTable().get(dbMultipartKey)); + + // As 1 unused parts exists, so 1 unused entry should be there in delete + // table. + deleteEntryCount++; + Assert.assertEquals(deleteEntryCount, omMetadataManager.countRowsInTable( + omMetadataManager.getDeletedTable())); + } + + private OmKeyInfo commitS3MultipartUpload(String volumeName, + String bucketName, String keyName, String multipartUploadID, + String fileName, String multipartKey, + OmMultipartKeyInfo omMultipartKeyInfo) throws IOException { + + PartKeyInfo part1 = createPartKeyInfoV1(volumeName, bucketName, parentID, + fileName, 1); + + addPart(1, part1, omMultipartKeyInfo); + + long clientId = Time.now(); + String openKey = omMetadataManager.getOpenFileName(parentID, fileName, + clientId); + + S3MultipartUploadCommitPartResponse s3MultipartUploadCommitPartResponse = + createS3CommitMPUResponseV1(volumeName, bucketName, parentID, + keyName, multipartUploadID, + omMultipartKeyInfo.getPartKeyInfo(1), + omMultipartKeyInfo, + OzoneManagerProtocolProtos.Status.OK, openKey); + + s3MultipartUploadCommitPartResponse.checkAndUpdateDB(omMetadataManager, + batchOperation); + + Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey)); + Assert.assertNull( + omMetadataManager.getMultipartInfoTable().get(multipartKey)); + + omMetadataManager.getStore().commitBatchOperation(batchOperation); + + // As 1 parts are created, so 1 entry should be there in delete table. + Assert.assertEquals(1, omMetadataManager.countRowsInTable( + omMetadataManager.getDeletedTable())); + + String part1DeletedKeyName = + omMultipartKeyInfo.getPartKeyInfo(1).getPartName(); + + Assert.assertNotNull(omMetadataManager.getDeletedTable().get( + part1DeletedKeyName)); + + RepeatedOmKeyInfo ro = + omMetadataManager.getDeletedTable().get(part1DeletedKeyName); + OmKeyInfo omPartKeyInfo = OmKeyInfo.getFromProtobuf(part1.getPartKeyInfo()); + Assert.assertEquals(omPartKeyInfo, ro.getOmKeyInfoList().get(0)); + + return omPartKeyInfo; + } + + private S3InitiateMultipartUploadResponse addS3InitiateMultipartUpload( + String volumeName, String bucketName, String keyName, + String multipartUploadID) throws IOException { + + S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponseV1 = + createS3InitiateMPUResponseV1(volumeName, bucketName, parentID, + keyName, multipartUploadID, new ArrayList<>()); + + s3InitiateMultipartUploadResponseV1.addToDBBatch(omMetadataManager, + batchOperation); + + return s3InitiateMultipartUploadResponseV1; + } + + private String getKeyName() { + return dirName + UUID.randomUUID().toString(); + } + + private void createParentPath(String volumeName, String bucketName) + throws Exception { + // Create parent dirs for the path + parentID = TestOMRequestUtils.addParentsToDirTable(volumeName, bucketName, + dirName, omMetadataManager); + } + + private void addS3MultipartUploadCommitPartResponseV1(String volumeName, + String bucketName, String keyName, String multipartUploadID, + String openKey) throws IOException { + S3MultipartUploadCommitPartResponse s3MultipartUploadCommitPartResponse = + createS3CommitMPUResponseV1(volumeName, bucketName, parentID, + keyName, multipartUploadID, null, null, + OzoneManagerProtocolProtos.Status.OK, openKey); + + s3MultipartUploadCommitPartResponse.addToDBBatch(omMetadataManager, + batchOperation); + + omMetadataManager.getStore().commitBatchOperation(batchOperation); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
