This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cb35e7ed1c HDDS-8638. Support CryptoOutputStream and 
CipherOutputStream in createStreamKey (#4733)
cb35e7ed1c is described below

commit cb35e7ed1c7fb2eef71c2b8a4ff5624c322c5511
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed May 24 08:12:30 2023 +0800

    HDDS-8638. Support CryptoOutputStream and CipherOutputStream in 
createStreamKey (#4733)
---
 .../hdds/scm/storage/AbstractDataStreamOutput.java |  9 +--
 .../hdds/scm/storage/ByteBufferStreamOutput.java   | 26 +++++----
 .../ozone/client/io/ByteArrayStreamOutput.java     | 65 ++++++++++++++++++++++
 .../ozone/client/io/ByteBufferOutputStream.java    | 46 +++++++++++++++
 .../ozone/client/io/KeyDataStreamOutput.java       |  8 +--
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 24 +-------
 .../ozone/client/io/OzoneDataStreamOutput.java     |  3 +-
 .../hadoop/ozone/client/io/OzoneOutputStream.java  |  3 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 54 +++++++++---------
 .../dist/src/main/smoketest/gdpr/gdpr.robot        |  5 ++
 .../client/rpc/TestOzoneAtRestEncryption.java      | 43 ++++++++++----
 .../hadoop/fs/ozone/OzoneFSDataStreamOutput.java   | 33 +----------
 12 files changed, 200 insertions(+), 119 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
index cad1d04792..412fb368a8 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
@@ -18,11 +18,11 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.client.io.ByteBufferOutputStream;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
 
@@ -35,7 +35,7 @@ import java.util.Objects;
  * This class is used for error handling methods.
  */
 public abstract class AbstractDataStreamOutput
-    implements ByteBufferStreamOutput {
+    extends ByteBufferOutputStream {
 
   private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
   private int retryCount;
@@ -48,11 +48,6 @@ public abstract class AbstractDataStreamOutput
     this.retryCount = 0;
   }
 
-  @VisibleForTesting
-  public int getRetryCount() {
-    return retryCount;
-  }
-
   protected void resetRetryCount() {
     retryCount = 0;
   }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
index 0650a685b6..b213bb1f4c 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
@@ -23,33 +23,35 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
-* This interface is for writing an output stream of ByteBuffers.
-* An ByteBufferStreamOutput accepts nio ByteBuffer and sends them to some sink.
-*/
+ * This interface is similar to {@link java.io.OutputStream}
+ * except that this class support {@link ByteBuffer} instead of byte[].
+ */
 public interface ByteBufferStreamOutput extends Closeable {
   /**
-   * Try to write all the bytes in ByteBuf b to DataStream.
+   * Similar to {@link java.io.OutputStream#write(byte[])},
+   * except that the parameter of this method is a {@link ByteBuffer}.
    *
-   * @param b the data.
+   * @param buffer the buffer containing data to be written.
    * @exception IOException if an I/O error occurs.
    */
-  default void write(ByteBuffer b) throws IOException {
+  default void write(ByteBuffer buffer) throws IOException {
+    final ByteBuffer b = buffer.asReadOnlyBuffer();
     write(b, b.position(), b.remaining());
   }
 
   /**
-   * Try to write the [off:off + len) slice in ByteBuf b to DataStream.
+   * Similar to {@link java.io.OutputStream#write(byte[], int, int)},
+   * except that the parameter of this method is a {@link ByteBuffer}.
    *
-   * @param b the data.
-   * @param off the start offset in the data.
+   * @param buffer the buffer containing data to be written.
+   * @param off the start offset.
    * @param len the number of bytes to write.
    * @exception  IOException  if an I/O error occurs.
    */
-  void write(ByteBuffer b, int off, int len) throws IOException;
+  void write(ByteBuffer buffer, int off, int len) throws IOException;
 
   /**
-   * Flushes this DataStream output and forces any buffered output bytes
-   * to be written out.
+   * Flush this output and force any buffered output bytes to be written out.
    *
    * @exception  IOException  if an I/O error occurs.
    */
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteArrayStreamOutput.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteArrayStreamOutput.java
new file mode 100644
index 0000000000..7872701de7
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteArrayStreamOutput.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link OutputStream} supporting {@link ByteBufferStreamOutput}.
+ * The subclass classes of this class must implement and optimize
+ * the {@link OutputStream#write(byte[], int, int)} method
+ * and optionally override
+ * the {@link ByteBufferStreamOutput#write(ByteBuffer, int, int)} method.
+ */
+public abstract class ByteArrayStreamOutput extends OutputStream
+    implements ByteBufferStreamOutput {
+  private static final int ARRAY_SIZE_LIMIT = 64 << 10; //64KB
+
+  @Override
+  public void write(ByteBuffer buffer, int off, int len) throws IOException {
+    if (len == 0) {
+      return;
+    }
+
+    if (buffer.hasArray()) {
+      write(buffer.array(), off, len);
+    } else {
+      final byte[] array = new byte[Math.min(ARRAY_SIZE_LIMIT, len)];
+      for (; len > 0;) {
+        final ByteBuffer readonly = buffer.asReadOnlyBuffer();
+        final int writeSize = Math.min(array.length, len);
+        readonly.position(off);
+        off += writeSize;
+        readonly.limit(off);
+        readonly.get(array, 0, writeSize);
+        write(array, 0, writeSize);
+        len -= writeSize;
+      }
+    }
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    write(new byte[]{(byte) b});
+  }
+}
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteBufferOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteBufferOutputStream.java
new file mode 100644
index 0000000000..cff7a8ecd3
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteBufferOutputStream.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link ByteBufferStreamOutput} supporting {@link OutputStream}.
+ * The subclass classes of this class must implement and optimize
+ * the {@link ByteBufferStreamOutput#write(ByteBuffer, int, int)} method
+ * and optionally override
+ * the {@link OutputStream#write(byte[], int, int)} method.
+ */
+public abstract class ByteBufferOutputStream extends OutputStream
+    implements ByteBufferStreamOutput {
+  @Override
+  public void write(@Nonnull byte[] byteArray) throws IOException {
+    write(ByteBuffer.wrap(byteArray));
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    write(new byte[]{(byte) b});
+  }
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index 59f943b4e3..edc76066b5 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 
 /**
  * Maintaining a list of BlockInputStream. Write based on offset.
@@ -407,7 +408,7 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput {
     private XceiverClientFactory xceiverManager;
     private OzoneManagerProtocol omClient;
     private int chunkSize;
-    private String requestID;
+    private final String requestID = UUID.randomUUID().toString();
     private String multipartUploadID;
     private int multipartNumber;
     private boolean isMultipartKey;
@@ -445,11 +446,6 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput {
       return this;
     }
 
-    public Builder setRequestID(String id) {
-      this.requestID = id;
-      return this;
-    }
-
     public Builder setIsMultipartKey(boolean isMultipart) {
       this.isMultipartKey = isMultipart;
       return this;
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index c88c61c2f3..2eea676561 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -93,8 +94,6 @@ public class KeyOutputStream extends OutputStream implements 
Syncable {
 
   private long clientID;
 
-  private OzoneManagerProtocol omClient;
-
   public KeyOutputStream(ReplicationConfig replicationConfig,
       ContainerClientMetrics clientMetrics) {
     this.replication = replicationConfig;
@@ -138,7 +137,7 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
       OzoneClientConfig config,
       OpenKeySession handler,
       XceiverClientFactory xceiverClientManager,
-      OzoneManagerProtocol omClient, int chunkSize,
+      OzoneManagerProtocol omClient,
       String requestId, ReplicationConfig replicationConfig,
       String uploadID, int partNumber, boolean isMultipart,
       boolean unsafeByteBufferConversion,
@@ -163,7 +162,6 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
     this.isException = false;
     this.writeOffset = 0;
     this.clientID = handler.getId();
-    this.omClient = omClient;
   }
 
   /**
@@ -579,8 +577,7 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
     private OpenKeySession openHandler;
     private XceiverClientFactory xceiverManager;
     private OzoneManagerProtocol omClient;
-    private int chunkSize;
-    private String requestID;
+    private final String requestID = UUID.randomUUID().toString();
     private String multipartUploadID;
     private int multipartNumber;
     private boolean isMultipartKey;
@@ -634,24 +631,10 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
       return this;
     }
 
-    public int getChunkSize() {
-      return chunkSize;
-    }
-
-    public Builder setChunkSize(int size) {
-      this.chunkSize = size;
-      return this;
-    }
-
     public String getRequestID() {
       return requestID;
     }
 
-    public Builder setRequestID(String id) {
-      this.requestID = id;
-      return this;
-    }
-
     public boolean isMultipartKey() {
       return isMultipartKey;
     }
@@ -703,7 +686,6 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
           openHandler,
           xceiverManager,
           omClient,
-          chunkSize,
           requestID,
           replicationConfig,
           multipartUploadID,
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index d40ac2b332..f937689e90 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -24,9 +24,8 @@ import java.nio.ByteBuffer;
 
 /**
  * OzoneDataStreamOutput is used to write data into Ozone.
- * It uses SCM's {@link KeyDataStreamOutput} for writing the data.
  */
-public class OzoneDataStreamOutput implements ByteBufferStreamOutput {
+public class OzoneDataStreamOutput extends ByteBufferOutputStream {
 
   private final ByteBufferStreamOutput byteBufferStreamOutput;
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
index fe5955c0db..8a50ee2c2d 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
@@ -28,9 +28,8 @@ import java.util.Optional;
 
 /**
  * OzoneOutputStream is used to write data into Ozone.
- * It uses SCM's {@link KeyOutputStream} for writing the data.
  */
-public class OzoneOutputStream extends OutputStream {
+public class OzoneOutputStream extends ByteArrayStreamOutput {
 
   private final OutputStream outputStream;
   private final Syncable syncable;
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 666fd0faa9..a4dbfb1828 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -23,6 +23,7 @@ import javax.crypto.Cipher;
 import javax.crypto.CipherInputStream;
 import javax.crypto.CipherOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.URI;
 import java.security.InvalidKeyException;
 import java.security.PrivilegedExceptionAction;
@@ -35,7 +36,6 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
@@ -49,6 +49,7 @@ import org.apache.hadoop.crypto.CryptoInputStream;
 import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -1247,7 +1248,6 @@ public class RpcClient implements ClientProtocol {
               this.conf.getObject(ReplicationConfigValidator.class);
       validator.validate(replicationConfig);
     }
-    String requestId = UUID.randomUUID().toString();
 
     OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
         .setVolumeName(volumeName)
@@ -1260,7 +1260,7 @@ public class RpcClient implements ClientProtocol {
         .setLatestVersionLocation(getLatestVersionLocation);
 
     OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
-    return createOutputStream(openKey, requestId);
+    return createOutputStream(openKey);
   }
 
   @Override
@@ -1275,7 +1275,6 @@ public class RpcClient implements ClientProtocol {
       HddsClientUtils.verifyKeyName(keyName);
     }
     HddsClientUtils.checkNotNull(keyName, replicationConfig);
-    String requestId = UUID.randomUUID().toString();
 
     OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
         .setVolumeName(volumeName)
@@ -1287,7 +1286,7 @@ public class RpcClient implements ClientProtocol {
         .setAcls(getAclList());
 
     OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
-    return createDataStreamOutput(openKey, requestId);
+    return createDataStreamOutput(openKey);
   }
 
   private KeyProvider.KeyVersion getDEK(FileEncryptionInfo feInfo)
@@ -1666,7 +1665,6 @@ public class RpcClient implements ClientProtocol {
         "number should be greater than zero and less than or equal to 10000");
     Preconditions.checkArgument(size >= 0, "size should be greater than or " +
         "equal to zero");
-    String requestId = UUID.randomUUID().toString();
     OmKeyArgs keyArgs = new OmKeyArgs.Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
@@ -1679,7 +1677,7 @@ public class RpcClient implements ClientProtocol {
         .build();
 
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
-    KeyOutputStream keyOutputStream = createKeyOutputStream(openKey, requestId)
+    KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
         .setMultipartNumber(partNumber)
         .setMultipartUploadID(uploadID)
         .setIsMultipartKey(true)
@@ -1719,7 +1717,6 @@ public class RpcClient implements ClientProtocol {
         "number should be greater than zero and less than or equal to 10000");
     Preconditions.checkArgument(size >= 0, "size should be greater than or " +
         "equal to zero");
-    String requestId = UUID.randomUUID().toString();
 
     OmKeyArgs keyArgs = new OmKeyArgs.Builder()
         .setVolumeName(volumeName)
@@ -1739,7 +1736,6 @@ public class RpcClient implements ClientProtocol {
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
             .setOmClient(ozoneManagerClient)
-            .setRequestID(requestId)
             .setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
             .setMultipartNumber(partNumber)
             .setMultipartUploadID(uploadID)
@@ -1961,7 +1957,7 @@ public class RpcClient implements ClientProtocol {
         .build();
     OpenKeySession keySession =
         ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
-    return createOutputStream(keySession, UUID.randomUUID().toString());
+    return createOutputStream(keySession);
   }
 
   private OmKeyArgs prepareOmKeyArgs(String volumeName, String bucketName,
@@ -1991,7 +1987,7 @@ public class RpcClient implements ClientProtocol {
         .build();
     OpenKeySession keySession =
         ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
-    return createDataStreamOutput(keySession, UUID.randomUUID().toString());
+    return createDataStreamOutput(keySession);
   }
 
   @Override
@@ -2142,8 +2138,8 @@ public class RpcClient implements ClientProtocol {
           new MultipartInputStream(keyInfo.getKeyName(), cryptoInputStreams));
     }
   }
-  private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey,
-      String requestId) throws IOException {
+  private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey)
+      throws IOException {
     final ReplicationConfig replicationConfig
         = openKey.getKeyInfo().getReplicationConfig();
     KeyDataStreamOutput keyOutputStream =
@@ -2151,7 +2147,6 @@ public class RpcClient implements ClientProtocol {
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
             .setOmClient(ozoneManagerClient)
-            .setRequestID(requestId)
             .setReplicationConfig(replicationConfig)
             .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
             .setConfig(conf.getObject(OzoneClientConfig.class))
@@ -2159,18 +2154,26 @@ public class RpcClient implements ClientProtocol {
     keyOutputStream
         .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
             openKey.getOpenVersion());
-    return new OzoneDataStreamOutput(keyOutputStream);
+    final OzoneOutputStream out = createSecureOutputStream(
+        openKey, keyOutputStream, null);
+    return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
   }
 
-  private OzoneOutputStream createOutputStream(OpenKeySession openKey,
-      String requestId) throws IOException {
+  private OzoneOutputStream createOutputStream(OpenKeySession openKey)
+      throws IOException {
 
-    KeyOutputStream keyOutputStream = createKeyOutputStream(openKey, requestId)
+    KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
         .build();
-
     keyOutputStream
         .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
             openKey.getOpenVersion());
+    final OzoneOutputStream out = createSecureOutputStream(
+        openKey, keyOutputStream, keyOutputStream);
+    return out != null ? out : new OzoneOutputStream(keyOutputStream);
+  }
+
+  private OzoneOutputStream createSecureOutputStream(OpenKeySession openKey,
+      OutputStream keyOutputStream, Syncable syncable) throws IOException {
     final FileEncryptionInfo feInfo =
         openKey.getKeyInfo().getFileEncryptionInfo();
     if (feInfo != null) {
@@ -2185,20 +2188,18 @@ public class RpcClient implements ClientProtocol {
         final GDPRSymmetricKey gk = getGDPRSymmetricKey(
             openKey.getKeyInfo().getMetadata(), Cipher.ENCRYPT_MODE);
         if (gk != null) {
-          return new OzoneOutputStream(
-              new CipherOutputStream(keyOutputStream, gk.getCipher()),
-              keyOutputStream);
+          return new OzoneOutputStream(new CipherOutputStream(
+              keyOutputStream, gk.getCipher()), syncable);
         }
       }  catch (Exception ex) {
         throw new IOException(ex);
       }
-
-      return new OzoneOutputStream(keyOutputStream);
+      return null;
     }
   }
 
-  private KeyOutputStream.Builder createKeyOutputStream(OpenKeySession openKey,
-      String requestId) {
+  private KeyOutputStream.Builder createKeyOutputStream(
+      OpenKeySession openKey) {
     KeyOutputStream.Builder builder;
 
     ReplicationConfig replicationConfig =
@@ -2216,7 +2217,6 @@ public class RpcClient implements ClientProtocol {
     return builder.setHandler(openKey)
         .setXceiverClientManager(xceiverClientManager)
         .setOmClient(ozoneManagerClient)
-        .setRequestID(requestId)
         .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
         .setConfig(conf.getObject(OzoneClientConfig.class))
         .setClientMetrics(clientMetrics);
diff --git a/hadoop-ozone/dist/src/main/smoketest/gdpr/gdpr.robot 
b/hadoop-ozone/dist/src/main/smoketest/gdpr/gdpr.robot
index 1078d95540..366010e031 100644
--- a/hadoop-ozone/dist/src/main/smoketest/gdpr/gdpr.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/gdpr/gdpr.robot
@@ -66,6 +66,11 @@ Test GDPR with --enforcegdpr=true
     ${result} =     Execute             ozone sh key info 
/${volume}/mybucket2/mykey | jq -r '. | select(.name=="mykey") | .metadata | 
.gdprEnabled'
                     Should Be Equal     ${result}       true
                     Execute             ozone sh key delete 
/${volume}/mybucket2/mykey
+                    Execute             ozone sh key put --stream 
/${volume}/mybucket2/myStreamKey /opt/hadoop/NOTICE.txt
+                    Execute             rm -f NOTICE.txt.1
+    ${result} =     Execute             ozone sh key info 
/${volume}/mybucket2/myStreamKey | jq -r '. | select(.name=="myStreamKey") | 
.metadata | .gdprEnabled'
+                    Should Be Equal     ${result}       true
+                    Execute             ozone sh key delete 
/${volume}/mybucket2/myStreamKey
 
 Test GDPR with -g=true
     [arguments]     ${volume}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index 8d8667051f..bbf459f426 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.hdds.scm.storage.MultipartInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -210,6 +211,7 @@ public class TestOzoneAtRestEncryption {
     OzoneBucket bucket = volume.getBucket(bucketName);
 
     createAndVerifyKeyData(bucket);
+    createAndVerifyStreamKeyData(bucket);
   }
 
   @Test
@@ -227,27 +229,45 @@ public class TestOzoneAtRestEncryption {
         linkVolumeName, linkBucketName);
 
     createAndVerifyKeyData(linkBucket);
+    createAndVerifyStreamKeyData(linkBucket);
   }
 
+  static void createAndVerifyStreamKeyData(OzoneBucket bucket)
+      throws Exception {
+    Instant testStartTime = Instant.now();
+    String keyName = UUID.randomUUID().toString();
+    String value = "sample value";
+    try (OzoneDataStreamOutput out = bucket.createStreamKey(keyName,
+        value.getBytes(StandardCharsets.UTF_8).length,
+        ReplicationConfig.fromTypeAndFactor(RATIS, ONE),
+        new HashMap<>())) {
+      out.write(value.getBytes(StandardCharsets.UTF_8));
+    }
+    verifyKeyData(bucket, keyName, value, testStartTime);
+  }
 
-  private void createAndVerifyKeyData(OzoneBucket bucket) throws Exception {
+  static void createAndVerifyKeyData(OzoneBucket bucket) throws Exception {
     Instant testStartTime = Instant.now();
     String keyName = UUID.randomUUID().toString();
     String value = "sample value";
     try (OzoneOutputStream out = bucket.createKey(keyName,
         value.getBytes(StandardCharsets.UTF_8).length,
-        ReplicationType.RATIS,
-        ReplicationFactor.ONE, new HashMap<>())) {
+        ReplicationConfig.fromTypeAndFactor(RATIS, ONE),
+        new HashMap<>())) {
       out.write(value.getBytes(StandardCharsets.UTF_8));
     }
+    verifyKeyData(bucket, keyName, value, testStartTime);
+  }
 
+  static void verifyKeyData(OzoneBucket bucket, String keyName, String value,
+      Instant testStartTime) throws Exception {
     // Verify content.
     OzoneKeyDetails key = bucket.getKey(keyName);
     Assert.assertEquals(keyName, key.getName());
 
     // Check file encryption info is set,
     // if set key will use this encryption info and encrypt data.
-    Assert.assertTrue(key.getFileEncryptionInfo() != null);
+    Assert.assertNotNull(key.getFileEncryptionInfo());
 
     byte[] fileContent;
     int len = 0;
@@ -324,8 +344,8 @@ public class TestOzoneAtRestEncryption {
     keyMetadata.put(OzoneConsts.GDPR_FLAG, "true");
     try (OzoneOutputStream out = bucket.createKey(keyName,
         value.getBytes(StandardCharsets.UTF_8).length,
-        ReplicationType.RATIS,
-        ReplicationFactor.ONE, keyMetadata)) {
+        ReplicationConfig.fromTypeAndFactor(RATIS, ONE),
+        keyMetadata)) {
       out.write(value.getBytes(StandardCharsets.UTF_8));
     }
 
@@ -364,6 +384,7 @@ public class TestOzoneAtRestEncryption {
     }, 500, 100000);
     RepeatedOmKeyInfo deletedKeys =
         getMatchedKeyInfo(keyName, omMetadataManager);
+    Assert.assertNotNull(deletedKeys);
     Map<String, String> deletedKeyMetadata =
         deletedKeys.getOmKeyInfoList().get(0).getMetadata();
     Assert.assertFalse(deletedKeyMetadata.containsKey(OzoneConsts.GDPR_FLAG));
@@ -374,7 +395,7 @@ public class TestOzoneAtRestEncryption {
         deletedKeys.getOmKeyInfoList().get(0).getFileEncryptionInfo());
   }
 
-  private boolean verifyRatisReplication(String volumeName, String bucketName,
+  static boolean verifyRatisReplication(String volumeName, String bucketName,
       String keyName, ReplicationType type, ReplicationFactor factor)
       throws IOException {
     OmKeyArgs keyArgs = new OmKeyArgs.Builder()
@@ -461,8 +482,8 @@ public class TestOzoneAtRestEncryption {
     String keyName = "mpu_test_key_" + numParts;
 
     // Initiate multipart upload
-    String uploadID = initiateMultipartUpload(bucket, keyName, RATIS,
-        ONE);
+    String uploadID = initiateMultipartUpload(bucket, keyName,
+        ReplicationConfig.fromTypeAndFactor(RATIS, ONE));
 
     // Upload Parts
     Map<Integer, String> partsMap = new TreeMap<>();
@@ -548,10 +569,10 @@ public class TestOzoneAtRestEncryption {
   }
 
   private String initiateMultipartUpload(OzoneBucket bucket, String keyName,
-      ReplicationType replicationType, ReplicationFactor replicationFactor)
+      ReplicationConfig replicationConfig)
       throws Exception {
     OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
-        replicationType, replicationFactor);
+        replicationConfig);
 
     String uploadID = multipartInfo.getUploadID();
     Assert.assertNotNull(uploadID);
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
index 0d44f75d66..dcb917f2f9 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
@@ -18,16 +18,15 @@
 package org.apache.hadoop.fs.ozone;
 
 import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+import org.apache.hadoop.ozone.client.io.ByteBufferOutputStream;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 /**
  * The ByteBuffer output stream for Ozone file system.
  */
-public class OzoneFSDataStreamOutput extends OutputStream
-    implements ByteBufferStreamOutput {
+public class OzoneFSDataStreamOutput extends ByteBufferOutputStream {
 
   private final ByteBufferStreamOutput byteBufferStreamOutput;
 
@@ -50,34 +49,6 @@ public class OzoneFSDataStreamOutput extends OutputStream
     byteBufferStreamOutput.write(b, off, len);
   }
 
-  @Override
-  public void write(byte[] b, int off, int len)
-      throws IOException {
-    write(ByteBuffer.wrap(b));
-  }
-
-  /**
-   * Writes the specified byte to this output stream. The general
-   * contract for <code>write</code> is that one byte is written
-   * to the output stream. The byte to be written is the eight
-   * low-order bits of the argument <code>b</code>. The 24
-   * high-order bits of <code>b</code> are ignored.
-   * <p>
-   * Subclasses of <code>OutputStream</code> must provide an
-   * implementation for this method.
-   *
-   * @param b the <code>byte</code>.
-   * @throws IOException if an I/O error occurs. In particular,
-   *                     an <code>IOException</code> may be thrown if the
-   *                     output stream has been closed.
-   */
-  @Override
-  public void write(int b) throws IOException {
-    byte[] singleBytes = new byte[1];
-    singleBytes[0] = (byte) b;
-    byteBufferStreamOutput.write(ByteBuffer.wrap(singleBytes));
-  }
-
   /**
    * Flushes this DataStream output and forces any buffered output bytes
    * to be written out.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to