This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit be9641f9fa281828621d5137fbbd7a1384096301 Author: hao guo <[email protected]> AuthorDate: Fri Nov 19 11:21:55 2021 +0800 HDDS-5879. [Ozone-Streaming] OzoneBucket add the createMultipartStreamKey method (#2760) --- .../apache/hadoop/ozone/client/OzoneBucket.java | 15 +++++ .../ozone/client/protocol/ClientProtocol.java | 18 ++++++ .../apache/hadoop/ozone/client/rpc/RpcClient.java | 64 ++++++++++++++++++++++ .../client/rpc/TestOzoneRpcClientWithRatis.java | 53 ++++++++++++++++++ 4 files changed, 150 insertions(+) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java index a11c2dbac5..68ea6304ee 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java @@ -810,6 +810,21 @@ public class OzoneBucket extends WithMetadata { uploadID); } + /** + * Create a part key for a multipart upload key. + * @param key + * @param size + * @param partNumber + * @param uploadID + * @return OzoneDataStreamOutput + * @throws IOException + */ + public OzoneDataStreamOutput createMultipartStreamKey(String key, + long size, int partNumber, String uploadID) throws IOException { + return proxy.createMultipartStreamKey(volumeName, name, + key, size, partNumber, uploadID); + } + /** * Complete Multipart upload. This will combine all the parts and make the * key visible in ozone. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index 8f7f385628..7660fc0e46 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -494,6 +494,24 @@ public interface ClientProtocol { int partNumber, String uploadID) throws IOException; + /** + * Create a part key for a multipart upload key. + * @param volumeName + * @param bucketName + * @param keyName + * @param size + * @param partNumber + * @param uploadID + * @return OzoneDataStreamOutput + * @throws IOException + */ + OzoneDataStreamOutput createMultipartStreamKey(String volumeName, + String bucketName, + String keyName, long size, + int partNumber, + String uploadID) + throws IOException; + /** * Complete Multipart upload. This will combine all the parts and make the * key visible in ozone. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index c52352bdca..62b9a868b3 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1570,6 +1570,70 @@ public class RpcClient implements ClientProtocol { } } + @Override + public OzoneDataStreamOutput createMultipartStreamKey( + String volumeName, + String bucketName, + String keyName, + long size, + int partNumber, + String uploadID) + throws IOException { + verifyVolumeName(volumeName); + verifyBucketName(bucketName); + if (checkKeyNameEnabled) { + HddsClientUtils.verifyKeyName(keyName); + } + HddsClientUtils.checkNotNull(keyName, uploadID); + Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000, "Part " + + "number should be greater than zero and less than or equal to 10000"); + Preconditions.checkArgument(size >= 0, "size should be greater than or " + + "equal to zero"); + String requestId = UUID.randomUUID().toString(); + + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setDataSize(size) + .setIsMultipartKey(true) + .setMultipartUploadID(uploadID) + .setMultipartUploadPartNumber(partNumber) + .setAcls(getAclList()) + .build(); + + OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs); + + KeyDataStreamOutput keyOutputStream = + new KeyDataStreamOutput.Builder() + .setHandler(openKey) + .setXceiverClientManager(xceiverClientManager) + .setOmClient(ozoneManagerClient) + .setRequestID(requestId) + .setReplicationConfig(openKey.getKeyInfo().getReplicationConfig()) + .setMultipartNumber(partNumber) + .setMultipartUploadID(uploadID) + .setIsMultipartKey(true) + .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) + .setConfig(clientConfig) + .build(); + keyOutputStream + .addPreallocateBlocks( + openKey.getKeyInfo().getLatestVersionLocations(), + openKey.getOpenVersion()); + + FileEncryptionInfo feInfo = openKey.getKeyInfo().getFileEncryptionInfo(); + if (feInfo != null) { + // todo: need to support file encrypt, + // https://issues.apache.org/jira/browse/HDDS-5892 + throw new UnsupportedOperationException( + "FileEncryptionInfo is not yet supported in " + + "createMultipartStreamKey"); + } else { + return new OzoneDataStreamOutput(keyOutputStream); + } + } + @Override public OmMultipartUploadCompleteInfo completeMultipartUpload( String volumeName, String bucketName, String keyName, String uploadID, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index 362a218af2..d9c67609e6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -19,10 +19,12 @@ package org.apache.hadoop.ozone.client.rpc; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.UUID; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -31,12 +33,15 @@ import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.junit.jupiter.api.AfterAll; import org.junit.Assert; import org.junit.jupiter.api.BeforeAll; @@ -44,6 +49,7 @@ import org.junit.jupiter.api.Test; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; /** @@ -155,4 +161,51 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract { } } } + + @Test + public void testMultiPartUploadWithStream() throws IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + String keyName = UUID.randomUUID().toString(); + + byte[] sampleData = new byte[1024 * 8]; + + int valueLength = sampleData.length; + + getStore().createVolume(volumeName); + OzoneVolume volume = getStore().getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + ReplicationConfig replicationConfig = + ReplicationConfig.fromTypeAndFactor( + ReplicationType.RATIS, + THREE); + + OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName, + replicationConfig); + + assertNotNull(multipartInfo); + String uploadID = multipartInfo.getUploadID(); + Assert.assertEquals(volumeName, multipartInfo.getVolumeName()); + Assert.assertEquals(bucketName, multipartInfo.getBucketName()); + Assert.assertEquals(keyName, multipartInfo.getKeyName()); + assertNotNull(multipartInfo.getUploadID()); + + OzoneDataStreamOutput ozoneStreamOutput = bucket.createMultipartStreamKey( + keyName, valueLength, 1, uploadID); + ozoneStreamOutput.write(ByteBuffer.wrap(sampleData), 0, + valueLength); + ozoneStreamOutput.close(); + + OzoneMultipartUploadPartListParts parts = + bucket.listParts(keyName, uploadID, 0, 1); + + Assert.assertEquals(parts.getPartInfoList().size(), 1); + + OzoneMultipartUploadPartListParts.PartInfo partInfo = + parts.getPartInfoList().get(0); + Assert.assertEquals(valueLength, partInfo.getSize()); + + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
