This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b9496f9eaf HDDS-5892. RpcClient#createMultipartStreamKey method, need
to support file encrypt (#4779)
b9496f9eaf is described below
commit b9496f9eaf0b9239fcd8b0084d859b4501ccc192
Author: hao guo <[email protected]>
AuthorDate: Thu Jun 22 01:07:44 2023 +0800
HDDS-5892. RpcClient#createMultipartStreamKey method, need to support file
encrypt (#4779)
---
.../ozone/client/io/OzoneDataStreamOutput.java | 19 ++++-
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 82 ++++++----------------
.../client/rpc/TestOzoneAtRestEncryption.java | 61 +++++++++++++++-
3 files changed, 99 insertions(+), 63 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index f937689e90..a1e4731a56 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -16,10 +16,12 @@
*/
package org.apache.hadoop.ozone.client.io;
+import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
@@ -55,9 +57,20 @@ public class OzoneDataStreamOutput extends
ByteBufferOutputStream {
}
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
- if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
- return ((KeyDataStreamOutput)
- byteBufferStreamOutput).getCommitUploadPartInfo();
+ if (byteBufferStreamOutput instanceof OzoneOutputStream) {
+ OutputStream outputStream =
+ ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream();
+ if (outputStream instanceof KeyDataStreamOutput) {
+ return ((KeyDataStreamOutput)
+ outputStream).getCommitUploadPartInfo();
+ } else if (outputStream instanceof CryptoOutputStream) {
+ OutputStream wrappedStream =
+ ((CryptoOutputStream) outputStream).getWrappedStream();
+ if (wrappedStream instanceof KeyDataStreamOutput) {
+ return ((KeyDataStreamOutput) wrappedStream)
+ .getCommitUploadPartInfo();
+ }
+ }
}
// Otherwise return null.
return null;
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index f8fd36d465..166efc842d 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1670,14 +1670,9 @@ public class RpcClient implements ClientProtocol {
return multipartInfo;
}
- @Override
- public OzoneOutputStream createMultipartKey(String volumeName,
- String bucketName,
- String keyName,
- long size,
- int partNumber,
- String uploadID)
- throws IOException {
+ private OpenKeySession newMultipartOpenKey(
+ String volumeName, String bucketName, String keyName,
+ long size, int partNumber, String uploadID) throws IOException {
verifyVolumeName(volumeName);
verifyBucketName(bucketName);
if (checkKeyNameEnabled) {
@@ -1698,27 +1693,21 @@ public class RpcClient implements ClientProtocol {
.setMultipartUploadPartNumber(partNumber)
.setAcls(getAclList())
.build();
+ return ozoneManagerClient.openKey(keyArgs);
+ }
- OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
+ @Override
+ public OzoneOutputStream createMultipartKey(
+ String volumeName, String bucketName, String keyName,
+ long size, int partNumber, String uploadID) throws IOException {
+ final OpenKeySession openKey = newMultipartOpenKey(
+ volumeName, bucketName, keyName, size, partNumber, uploadID);
KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
.build();
- keyOutputStream.addPreallocateBlocks(
- openKey.getKeyInfo().getLatestVersionLocations(),
- openKey.getOpenVersion());
- FileEncryptionInfo feInfo = openKey.getKeyInfo().getFileEncryptionInfo();
- if (feInfo != null) {
- KeyProvider.KeyVersion decrypted = getDEK(feInfo);
- final CryptoOutputStream cryptoOut =
- new CryptoOutputStream(keyOutputStream,
- OzoneKMSUtil.getCryptoCodec(conf, feInfo),
- decrypted.getMaterial(), feInfo.getIV());
- return new OzoneOutputStream(cryptoOut);
- } else {
- return new OzoneOutputStream(keyOutputStream);
- }
+ return createOutputStream(openKey, keyOutputStream);
}
@Override
@@ -1730,29 +1719,8 @@ public class RpcClient implements ClientProtocol {
int partNumber,
String uploadID)
throws IOException {
- verifyVolumeName(volumeName);
- verifyBucketName(bucketName);
- if (checkKeyNameEnabled) {
- HddsClientUtils.verifyKeyName(keyName);
- }
- HddsClientUtils.checkNotNull(keyName, uploadID);
- Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000, "Part "
+
- "number should be greater than zero and less than or equal to 10000");
- Preconditions.checkArgument(size >= 0, "size should be greater than or " +
- "equal to zero");
-
- OmKeyArgs keyArgs = new OmKeyArgs.Builder()
- .setVolumeName(volumeName)
- .setBucketName(bucketName)
- .setKeyName(keyName)
- .setDataSize(size)
- .setIsMultipartKey(true)
- .setMultipartUploadID(uploadID)
- .setMultipartUploadPartNumber(partNumber)
- .setAcls(getAclList())
- .build();
-
- OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
+ final OpenKeySession openKey = newMultipartOpenKey(
+ volumeName, bucketName, keyName, size, partNumber, uploadID);
KeyDataStreamOutput keyOutputStream =
new KeyDataStreamOutput.Builder()
@@ -1770,17 +1738,9 @@ public class RpcClient implements ClientProtocol {
.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
-
- FileEncryptionInfo feInfo = openKey.getKeyInfo().getFileEncryptionInfo();
- if (feInfo != null) {
- // todo: need to support file encrypt,
- // https://issues.apache.org/jira/browse/HDDS-5892
- throw new UnsupportedOperationException(
- "FileEncryptionInfo is not yet supported in " +
- "createMultipartStreamKey");
- } else {
- return new OzoneDataStreamOutput(keyOutputStream);
- }
+ final OzoneOutputStream out = createSecureOutputStream(
+ openKey, keyOutputStream, null);
+ return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
}
@Override
@@ -2181,12 +2141,16 @@ public class RpcClient implements ClientProtocol {
openKey, keyOutputStream, null);
return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
}
-
private OzoneOutputStream createOutputStream(OpenKeySession openKey)
throws IOException {
-
KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
.build();
+ return createOutputStream(openKey, keyOutputStream);
+ }
+
+ private OzoneOutputStream createOutputStream(OpenKeySession openKey,
+ KeyOutputStream keyOutputStream)
+ throws IOException {
keyOutputStream
.addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index f37cf405f6..c6698c663c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.rpc;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
@@ -465,6 +466,32 @@ public class TestOzoneAtRestEncryption {
testMultipartUploadWithEncryption(bucket, 3);
}
+ @Test
+ public void testMPUwithOneStreamPart() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ OzoneBucket bucket = createVolumeAndBucket(volumeName, bucketName);
+ testMultipartUploadWithEncryption(bucket, 1, true);
+ }
+
+ @Test
+ public void testMPUwithTwoStreamParts() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ OzoneBucket bucket = createVolumeAndBucket(volumeName, bucketName);
+ testMultipartUploadWithEncryption(bucket, 2, true);
+ }
+
+ @Test
+ public void testMPUwithThreeStreamPartsOverride() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ OzoneBucket bucket = createVolumeAndBucket(volumeName, bucketName);
+ testMultipartUploadWithEncryption(bucket, 3);
+
+ // override the key and check content
+ testMultipartUploadWithEncryption(bucket, 3, true);
+ }
@Test
public void testMPUwithLinkBucket() throws Exception {
@@ -481,6 +508,11 @@ public class TestOzoneAtRestEncryption {
public void testMultipartUploadWithEncryption(OzoneBucket bucket,
int numParts) throws Exception {
+ testMultipartUploadWithEncryption(bucket, numParts, false);
+ }
+
+ public void testMultipartUploadWithEncryption(OzoneBucket bucket,
+ int numParts, boolean isStream) throws Exception {
String keyName = "mpu_test_key_" + numParts;
// Initiate multipart upload
@@ -499,7 +531,14 @@ public class TestOzoneAtRestEncryption {
int partSize = (MPU_PART_MIN_SIZE * i) +
RANDOM.nextInt(DEFAULT_CRYPTO_BUFFER_SIZE - 1) + 1;
byte[] data = generateRandomData(partSize);
- String partName = uploadPart(bucket, keyName, uploadID, i, data);
+
+ String partName;
+ if (isStream) {
+ partName = uploadStreamPart(bucket, keyName, uploadID, i, data);
+ } else {
+ partName = uploadPart(bucket, keyName, uploadID, i, data);
+ }
+
partsMap.put(i, partName);
partsData.add(data);
keySize += data.length;
@@ -581,6 +620,26 @@ public class TestOzoneAtRestEncryption {
return uploadID;
}
+ private String uploadStreamPart(OzoneBucket bucket, String keyName,
+ String uploadID, int partNumber, byte[] data) throws Exception {
+ final int length = data.length;
+
+ OzoneDataStreamOutput multipartStreamKey =
+ bucket.createMultipartStreamKey(keyName,
+ length, partNumber, uploadID);
+
+ ByteBuffer dataBuffer = ByteBuffer.wrap(data);
+ multipartStreamKey.write(dataBuffer, 0, length);
+ multipartStreamKey.close();
+
+ OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
+ multipartStreamKey.getCommitUploadPartInfo();
+
+ Assert.assertNotNull(omMultipartCommitUploadPartInfo);
+ Assert.assertNotNull(omMultipartCommitUploadPartInfo.getPartName());
+ return omMultipartCommitUploadPartInfo.getPartName();
+ }
+
private String uploadPart(OzoneBucket bucket, String keyName,
String uploadID, int partNumber, byte[] data) throws Exception {
OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]