This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b9496f9eaf HDDS-5892. RpcClient#createMultipartStreamKey method, need 
to support file encrypt (#4779)
b9496f9eaf is described below

commit b9496f9eaf0b9239fcd8b0084d859b4501ccc192
Author: hao guo <[email protected]>
AuthorDate: Thu Jun 22 01:07:44 2023 +0800

    HDDS-5892. RpcClient#createMultipartStreamKey method, need to support file 
encrypt (#4779)
---
 .../ozone/client/io/OzoneDataStreamOutput.java     | 19 ++++-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 82 ++++++----------------
 .../client/rpc/TestOzoneAtRestEncryption.java      | 61 +++++++++++++++-
 3 files changed, 99 insertions(+), 63 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index f937689e90..a1e4731a56 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -16,10 +16,12 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 /**
@@ -55,9 +57,20 @@ public class OzoneDataStreamOutput extends 
ByteBufferOutputStream {
   }
 
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
-    if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
-      return ((KeyDataStreamOutput)
-              byteBufferStreamOutput).getCommitUploadPartInfo();
+    if (byteBufferStreamOutput instanceof OzoneOutputStream) {
+      OutputStream outputStream =
+          ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream();
+      if (outputStream instanceof KeyDataStreamOutput) {
+        return ((KeyDataStreamOutput)
+            outputStream).getCommitUploadPartInfo();
+      } else if (outputStream instanceof CryptoOutputStream) {
+        OutputStream wrappedStream =
+            ((CryptoOutputStream) outputStream).getWrappedStream();
+        if (wrappedStream instanceof KeyDataStreamOutput) {
+          return ((KeyDataStreamOutput) wrappedStream)
+              .getCommitUploadPartInfo();
+        }
+      }
     }
     // Otherwise return null.
     return null;
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index f8fd36d465..166efc842d 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1670,14 +1670,9 @@ public class RpcClient implements ClientProtocol {
     return multipartInfo;
   }
 
-  @Override
-  public OzoneOutputStream createMultipartKey(String volumeName,
-                                              String bucketName,
-                                              String keyName,
-                                              long size,
-                                              int partNumber,
-                                              String uploadID)
-      throws IOException {
+  private OpenKeySession newMultipartOpenKey(
+      String volumeName, String bucketName, String keyName,
+      long size, int partNumber, String uploadID) throws IOException {
     verifyVolumeName(volumeName);
     verifyBucketName(bucketName);
     if (checkKeyNameEnabled) {
@@ -1698,27 +1693,21 @@ public class RpcClient implements ClientProtocol {
         .setMultipartUploadPartNumber(partNumber)
         .setAcls(getAclList())
         .build();
+    return ozoneManagerClient.openKey(keyArgs);
+  }
 
-    OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
+  @Override
+  public OzoneOutputStream createMultipartKey(
+      String volumeName, String bucketName, String keyName,
+      long size, int partNumber, String uploadID) throws IOException {
+    final OpenKeySession openKey = newMultipartOpenKey(
+        volumeName, bucketName, keyName, size, partNumber, uploadID);
     KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
         .setMultipartNumber(partNumber)
         .setMultipartUploadID(uploadID)
         .setIsMultipartKey(true)
         .build();
-    keyOutputStream.addPreallocateBlocks(
-        openKey.getKeyInfo().getLatestVersionLocations(),
-        openKey.getOpenVersion());
-    FileEncryptionInfo feInfo = openKey.getKeyInfo().getFileEncryptionInfo();
-    if (feInfo != null) {
-      KeyProvider.KeyVersion decrypted = getDEK(feInfo);
-      final CryptoOutputStream cryptoOut =
-          new CryptoOutputStream(keyOutputStream,
-              OzoneKMSUtil.getCryptoCodec(conf, feInfo),
-              decrypted.getMaterial(), feInfo.getIV());
-      return new OzoneOutputStream(cryptoOut);
-    } else {
-      return new OzoneOutputStream(keyOutputStream);
-    }
+    return createOutputStream(openKey, keyOutputStream);
   }
 
   @Override
@@ -1730,29 +1719,8 @@ public class RpcClient implements ClientProtocol {
       int partNumber,
       String uploadID)
       throws IOException {
-    verifyVolumeName(volumeName);
-    verifyBucketName(bucketName);
-    if (checkKeyNameEnabled) {
-      HddsClientUtils.verifyKeyName(keyName);
-    }
-    HddsClientUtils.checkNotNull(keyName, uploadID);
-    Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000, "Part " 
+
-        "number should be greater than zero and less than or equal to 10000");
-    Preconditions.checkArgument(size >= 0, "size should be greater than or " +
-        "equal to zero");
-
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName)
-        .setDataSize(size)
-        .setIsMultipartKey(true)
-        .setMultipartUploadID(uploadID)
-        .setMultipartUploadPartNumber(partNumber)
-        .setAcls(getAclList())
-        .build();
-
-    OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
+    final OpenKeySession openKey = newMultipartOpenKey(
+        volumeName, bucketName, keyName, size, partNumber, uploadID);
 
     KeyDataStreamOutput keyOutputStream =
         new KeyDataStreamOutput.Builder()
@@ -1770,17 +1738,9 @@ public class RpcClient implements ClientProtocol {
         .addPreallocateBlocks(
             openKey.getKeyInfo().getLatestVersionLocations(),
             openKey.getOpenVersion());
-
-    FileEncryptionInfo feInfo = openKey.getKeyInfo().getFileEncryptionInfo();
-    if (feInfo != null) {
-      // todo: need to support file encrypt,
-      //  https://issues.apache.org/jira/browse/HDDS-5892
-      throw new UnsupportedOperationException(
-          "FileEncryptionInfo is not yet supported in " +
-              "createMultipartStreamKey");
-    } else {
-      return new OzoneDataStreamOutput(keyOutputStream);
-    }
+    final OzoneOutputStream out = createSecureOutputStream(
+        openKey, keyOutputStream, null);
+    return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
   }
 
   @Override
@@ -2181,12 +2141,16 @@ public class RpcClient implements ClientProtocol {
         openKey, keyOutputStream, null);
     return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
   }
-
   private OzoneOutputStream createOutputStream(OpenKeySession openKey)
       throws IOException {
-
     KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
         .build();
+    return createOutputStream(openKey, keyOutputStream);
+  }
+
+  private OzoneOutputStream createOutputStream(OpenKeySession openKey,
+      KeyOutputStream keyOutputStream)
+      throws IOException {
     keyOutputStream
         .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
             openKey.getOpenVersion());
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index f37cf405f6..c6698c663c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.rpc;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.NoSuchAlgorithmException;
 import java.time.Instant;
@@ -465,6 +466,32 @@ public class TestOzoneAtRestEncryption {
     testMultipartUploadWithEncryption(bucket, 3);
   }
 
+  @Test
+  public void testMPUwithOneStreamPart() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    OzoneBucket bucket = createVolumeAndBucket(volumeName, bucketName);
+    testMultipartUploadWithEncryption(bucket, 1, true);
+  }
+
+  @Test
+  public void testMPUwithTwoStreamParts() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    OzoneBucket bucket = createVolumeAndBucket(volumeName, bucketName);
+    testMultipartUploadWithEncryption(bucket, 2, true);
+  }
+
+  @Test
+  public void testMPUwithThreeStreamPartsOverride() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    OzoneBucket bucket = createVolumeAndBucket(volumeName, bucketName);
+    testMultipartUploadWithEncryption(bucket, 3);
+
+    // override the key and check content
+    testMultipartUploadWithEncryption(bucket, 3, true);
+  }
 
   @Test
   public void testMPUwithLinkBucket() throws Exception {
@@ -481,6 +508,11 @@ public class TestOzoneAtRestEncryption {
 
   public void testMultipartUploadWithEncryption(OzoneBucket bucket,
       int numParts) throws Exception {
+    testMultipartUploadWithEncryption(bucket, numParts, false);
+  }
+
+  public void testMultipartUploadWithEncryption(OzoneBucket bucket,
+      int numParts, boolean isStream) throws Exception {
     String keyName = "mpu_test_key_" + numParts;
 
     // Initiate multipart upload
@@ -499,7 +531,14 @@ public class TestOzoneAtRestEncryption {
       int partSize = (MPU_PART_MIN_SIZE * i) +
           RANDOM.nextInt(DEFAULT_CRYPTO_BUFFER_SIZE - 1) + 1;
       byte[] data = generateRandomData(partSize);
-      String partName = uploadPart(bucket, keyName, uploadID, i, data);
+
+      String partName;
+      if (isStream) {
+        partName = uploadStreamPart(bucket, keyName, uploadID, i, data);
+      } else {
+        partName = uploadPart(bucket, keyName, uploadID, i, data);
+      }
+
       partsMap.put(i, partName);
       partsData.add(data);
       keySize += data.length;
@@ -581,6 +620,26 @@ public class TestOzoneAtRestEncryption {
     return uploadID;
   }
 
+  private String uploadStreamPart(OzoneBucket bucket, String keyName,
+      String uploadID, int partNumber, byte[] data) throws Exception {
+    final int length = data.length;
+
+    OzoneDataStreamOutput multipartStreamKey =
+        bucket.createMultipartStreamKey(keyName,
+            length, partNumber, uploadID);
+
+    ByteBuffer dataBuffer = ByteBuffer.wrap(data);
+    multipartStreamKey.write(dataBuffer, 0, length);
+    multipartStreamKey.close();
+
+    OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
+        multipartStreamKey.getCommitUploadPartInfo();
+
+    Assert.assertNotNull(omMultipartCommitUploadPartInfo);
+    Assert.assertNotNull(omMultipartCommitUploadPartInfo.getPartName());
+    return omMultipartCommitUploadPartInfo.getPartName();
+  }
+
   private String uploadPart(OzoneBucket bucket, String keyName,
       String uploadID, int partNumber, byte[] data) throws Exception {
     OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to