This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cb35e7ed1c HDDS-8638. Support CryptoOutputStream and
CipherOutputStream in createStreamKey (#4733)
cb35e7ed1c is described below
commit cb35e7ed1c7fb2eef71c2b8a4ff5624c322c5511
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed May 24 08:12:30 2023 +0800
HDDS-8638. Support CryptoOutputStream and CipherOutputStream in
createStreamKey (#4733)
---
.../hdds/scm/storage/AbstractDataStreamOutput.java | 9 +--
.../hdds/scm/storage/ByteBufferStreamOutput.java | 26 +++++----
.../ozone/client/io/ByteArrayStreamOutput.java | 65 ++++++++++++++++++++++
.../ozone/client/io/ByteBufferOutputStream.java | 46 +++++++++++++++
.../ozone/client/io/KeyDataStreamOutput.java | 8 +--
.../hadoop/ozone/client/io/KeyOutputStream.java | 24 +-------
.../ozone/client/io/OzoneDataStreamOutput.java | 3 +-
.../hadoop/ozone/client/io/OzoneOutputStream.java | 3 +-
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 54 +++++++++---------
.../dist/src/main/smoketest/gdpr/gdpr.robot | 5 ++
.../client/rpc/TestOzoneAtRestEncryption.java | 43 ++++++++++----
.../hadoop/fs/ozone/OzoneFSDataStreamOutput.java | 33 +----------
12 files changed, 200 insertions(+), 119 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
index cad1d04792..412fb368a8 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
@@ -18,11 +18,11 @@
package org.apache.hadoop.hdds.scm.storage;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.client.io.ByteBufferOutputStream;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
@@ -35,7 +35,7 @@ import java.util.Objects;
* This class is used for error handling methods.
*/
public abstract class AbstractDataStreamOutput
- implements ByteBufferStreamOutput {
+ extends ByteBufferOutputStream {
private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
private int retryCount;
@@ -48,11 +48,6 @@ public abstract class AbstractDataStreamOutput
this.retryCount = 0;
}
- @VisibleForTesting
- public int getRetryCount() {
- return retryCount;
- }
-
protected void resetRetryCount() {
retryCount = 0;
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
index 0650a685b6..b213bb1f4c 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
@@ -23,33 +23,35 @@ import java.io.IOException;
import java.nio.ByteBuffer;
/**
-* This interface is for writing an output stream of ByteBuffers.
-* An ByteBufferStreamOutput accepts nio ByteBuffer and sends them to some sink.
-*/
+ * This interface is similar to {@link java.io.OutputStream}
+ * except that this class support {@link ByteBuffer} instead of byte[].
+ */
public interface ByteBufferStreamOutput extends Closeable {
/**
- * Try to write all the bytes in ByteBuf b to DataStream.
+ * Similar to {@link java.io.OutputStream#write(byte[])},
+ * except that the parameter of this method is a {@link ByteBuffer}.
*
- * @param b the data.
+ * @param buffer the buffer containing data to be written.
* @exception IOException if an I/O error occurs.
*/
- default void write(ByteBuffer b) throws IOException {
+ default void write(ByteBuffer buffer) throws IOException {
+ final ByteBuffer b = buffer.asReadOnlyBuffer();
write(b, b.position(), b.remaining());
}
/**
- * Try to write the [off:off + len) slice in ByteBuf b to DataStream.
+ * Similar to {@link java.io.OutputStream#write(byte[], int, int)},
+ * except that the parameter of this method is a {@link ByteBuffer}.
*
- * @param b the data.
- * @param off the start offset in the data.
+ * @param buffer the buffer containing data to be written.
+ * @param off the start offset.
* @param len the number of bytes to write.
* @exception IOException if an I/O error occurs.
*/
- void write(ByteBuffer b, int off, int len) throws IOException;
+ void write(ByteBuffer buffer, int off, int len) throws IOException;
/**
- * Flushes this DataStream output and forces any buffered output bytes
- * to be written out.
+ * Flush this output and force any buffered output bytes to be written out.
*
* @exception IOException if an I/O error occurs.
*/
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteArrayStreamOutput.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteArrayStreamOutput.java
new file mode 100644
index 0000000000..7872701de7
--- /dev/null
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteArrayStreamOutput.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link OutputStream} supporting {@link ByteBufferStreamOutput}.
+ * The subclass classes of this class must implement and optimize
+ * the {@link OutputStream#write(byte[], int, int)} method
+ * and optionally override
+ * the {@link ByteBufferStreamOutput#write(ByteBuffer, int, int)} method.
+ */
+public abstract class ByteArrayStreamOutput extends OutputStream
+ implements ByteBufferStreamOutput {
+ private static final int ARRAY_SIZE_LIMIT = 64 << 10; //64KB
+
+ @Override
+ public void write(ByteBuffer buffer, int off, int len) throws IOException {
+ if (len == 0) {
+ return;
+ }
+
+ if (buffer.hasArray()) {
+ write(buffer.array(), off, len);
+ } else {
+ final byte[] array = new byte[Math.min(ARRAY_SIZE_LIMIT, len)];
+ for (; len > 0;) {
+ final ByteBuffer readonly = buffer.asReadOnlyBuffer();
+ final int writeSize = Math.min(array.length, len);
+ readonly.position(off);
+ off += writeSize;
+ readonly.limit(off);
+ readonly.get(array, 0, writeSize);
+ write(array, 0, writeSize);
+ len -= writeSize;
+ }
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ write(new byte[]{(byte) b});
+ }
+}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteBufferOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteBufferOutputStream.java
new file mode 100644
index 0000000000..cff7a8ecd3
--- /dev/null
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ByteBufferOutputStream.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link ByteBufferStreamOutput} supporting {@link OutputStream}.
+ * The subclass classes of this class must implement and optimize
+ * the {@link ByteBufferStreamOutput#write(ByteBuffer, int, int)} method
+ * and optionally override
+ * the {@link OutputStream#write(byte[], int, int)} method.
+ */
+public abstract class ByteBufferOutputStream extends OutputStream
+ implements ByteBufferStreamOutput {
+ @Override
+ public void write(@Nonnull byte[] byteArray) throws IOException {
+ write(ByteBuffer.wrap(byteArray));
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ write(new byte[]{(byte) b});
+ }
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index 59f943b4e3..edc76066b5 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
+import java.util.UUID;
/**
* Maintaining a list of BlockInputStream. Write based on offset.
@@ -407,7 +408,7 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput {
private XceiverClientFactory xceiverManager;
private OzoneManagerProtocol omClient;
private int chunkSize;
- private String requestID;
+ private final String requestID = UUID.randomUUID().toString();
private String multipartUploadID;
private int multipartNumber;
private boolean isMultipartKey;
@@ -445,11 +446,6 @@ public class KeyDataStreamOutput extends
AbstractDataStreamOutput {
return this;
}
- public Builder setRequestID(String id) {
- this.requestID = id;
- return this;
- }
-
public Builder setIsMultipartKey(boolean isMultipart) {
this.isMultipartKey = isMultipart;
return this;
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index c88c61c2f3..2eea676561 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -23,6 +23,7 @@ import java.io.OutputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -93,8 +94,6 @@ public class KeyOutputStream extends OutputStream implements
Syncable {
private long clientID;
- private OzoneManagerProtocol omClient;
-
public KeyOutputStream(ReplicationConfig replicationConfig,
ContainerClientMetrics clientMetrics) {
this.replication = replicationConfig;
@@ -138,7 +137,7 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
OzoneClientConfig config,
OpenKeySession handler,
XceiverClientFactory xceiverClientManager,
- OzoneManagerProtocol omClient, int chunkSize,
+ OzoneManagerProtocol omClient,
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber, boolean isMultipart,
boolean unsafeByteBufferConversion,
@@ -163,7 +162,6 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
this.isException = false;
this.writeOffset = 0;
this.clientID = handler.getId();
- this.omClient = omClient;
}
/**
@@ -579,8 +577,7 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
private OpenKeySession openHandler;
private XceiverClientFactory xceiverManager;
private OzoneManagerProtocol omClient;
- private int chunkSize;
- private String requestID;
+ private final String requestID = UUID.randomUUID().toString();
private String multipartUploadID;
private int multipartNumber;
private boolean isMultipartKey;
@@ -634,24 +631,10 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
return this;
}
- public int getChunkSize() {
- return chunkSize;
- }
-
- public Builder setChunkSize(int size) {
- this.chunkSize = size;
- return this;
- }
-
public String getRequestID() {
return requestID;
}
- public Builder setRequestID(String id) {
- this.requestID = id;
- return this;
- }
-
public boolean isMultipartKey() {
return isMultipartKey;
}
@@ -703,7 +686,6 @@ public class KeyOutputStream extends OutputStream
implements Syncable {
openHandler,
xceiverManager,
omClient,
- chunkSize,
requestID,
replicationConfig,
multipartUploadID,
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index d40ac2b332..f937689e90 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -24,9 +24,8 @@ import java.nio.ByteBuffer;
/**
* OzoneDataStreamOutput is used to write data into Ozone.
- * It uses SCM's {@link KeyDataStreamOutput} for writing the data.
*/
-public class OzoneDataStreamOutput implements ByteBufferStreamOutput {
+public class OzoneDataStreamOutput extends ByteBufferOutputStream {
private final ByteBufferStreamOutput byteBufferStreamOutput;
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
index fe5955c0db..8a50ee2c2d 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
@@ -28,9 +28,8 @@ import java.util.Optional;
/**
* OzoneOutputStream is used to write data into Ozone.
- * It uses SCM's {@link KeyOutputStream} for writing the data.
*/
-public class OzoneOutputStream extends OutputStream {
+public class OzoneOutputStream extends ByteArrayStreamOutput {
private final OutputStream outputStream;
private final Syncable syncable;
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 666fd0faa9..a4dbfb1828 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -23,6 +23,7 @@ import javax.crypto.Cipher;
import javax.crypto.CipherInputStream;
import javax.crypto.CipherOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.URI;
import java.security.InvalidKeyException;
import java.security.PrivilegedExceptionAction;
@@ -35,7 +36,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
@@ -49,6 +49,7 @@ import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -1247,7 +1248,6 @@ public class RpcClient implements ClientProtocol {
this.conf.getObject(ReplicationConfigValidator.class);
validator.validate(replicationConfig);
}
- String requestId = UUID.randomUUID().toString();
OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
@@ -1260,7 +1260,7 @@ public class RpcClient implements ClientProtocol {
.setLatestVersionLocation(getLatestVersionLocation);
OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
- return createOutputStream(openKey, requestId);
+ return createOutputStream(openKey);
}
@Override
@@ -1275,7 +1275,6 @@ public class RpcClient implements ClientProtocol {
HddsClientUtils.verifyKeyName(keyName);
}
HddsClientUtils.checkNotNull(keyName, replicationConfig);
- String requestId = UUID.randomUUID().toString();
OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
@@ -1287,7 +1286,7 @@ public class RpcClient implements ClientProtocol {
.setAcls(getAclList());
OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
- return createDataStreamOutput(openKey, requestId);
+ return createDataStreamOutput(openKey);
}
private KeyProvider.KeyVersion getDEK(FileEncryptionInfo feInfo)
@@ -1666,7 +1665,6 @@ public class RpcClient implements ClientProtocol {
"number should be greater than zero and less than or equal to 10000");
Preconditions.checkArgument(size >= 0, "size should be greater than or " +
"equal to zero");
- String requestId = UUID.randomUUID().toString();
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
@@ -1679,7 +1677,7 @@ public class RpcClient implements ClientProtocol {
.build();
OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
- KeyOutputStream keyOutputStream = createKeyOutputStream(openKey, requestId)
+ KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
@@ -1719,7 +1717,6 @@ public class RpcClient implements ClientProtocol {
"number should be greater than zero and less than or equal to 10000");
Preconditions.checkArgument(size >= 0, "size should be greater than or " +
"equal to zero");
- String requestId = UUID.randomUUID().toString();
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
@@ -1739,7 +1736,6 @@ public class RpcClient implements ClientProtocol {
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
- .setRequestID(requestId)
.setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
@@ -1961,7 +1957,7 @@ public class RpcClient implements ClientProtocol {
.build();
OpenKeySession keySession =
ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
- return createOutputStream(keySession, UUID.randomUUID().toString());
+ return createOutputStream(keySession);
}
private OmKeyArgs prepareOmKeyArgs(String volumeName, String bucketName,
@@ -1991,7 +1987,7 @@ public class RpcClient implements ClientProtocol {
.build();
OpenKeySession keySession =
ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
- return createDataStreamOutput(keySession, UUID.randomUUID().toString());
+ return createDataStreamOutput(keySession);
}
@Override
@@ -2142,8 +2138,8 @@ public class RpcClient implements ClientProtocol {
new MultipartInputStream(keyInfo.getKeyName(), cryptoInputStreams));
}
}
- private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey,
- String requestId) throws IOException {
+ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey)
+ throws IOException {
final ReplicationConfig replicationConfig
= openKey.getKeyInfo().getReplicationConfig();
KeyDataStreamOutput keyOutputStream =
@@ -2151,7 +2147,6 @@ public class RpcClient implements ClientProtocol {
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
- .setRequestID(requestId)
.setReplicationConfig(replicationConfig)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(conf.getObject(OzoneClientConfig.class))
@@ -2159,18 +2154,26 @@ public class RpcClient implements ClientProtocol {
keyOutputStream
.addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
- return new OzoneDataStreamOutput(keyOutputStream);
+ final OzoneOutputStream out = createSecureOutputStream(
+ openKey, keyOutputStream, null);
+ return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
}
- private OzoneOutputStream createOutputStream(OpenKeySession openKey,
- String requestId) throws IOException {
+ private OzoneOutputStream createOutputStream(OpenKeySession openKey)
+ throws IOException {
- KeyOutputStream keyOutputStream = createKeyOutputStream(openKey, requestId)
+ KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
.build();
-
keyOutputStream
.addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
+ final OzoneOutputStream out = createSecureOutputStream(
+ openKey, keyOutputStream, keyOutputStream);
+ return out != null ? out : new OzoneOutputStream(keyOutputStream);
+ }
+
+ private OzoneOutputStream createSecureOutputStream(OpenKeySession openKey,
+ OutputStream keyOutputStream, Syncable syncable) throws IOException {
final FileEncryptionInfo feInfo =
openKey.getKeyInfo().getFileEncryptionInfo();
if (feInfo != null) {
@@ -2185,20 +2188,18 @@ public class RpcClient implements ClientProtocol {
final GDPRSymmetricKey gk = getGDPRSymmetricKey(
openKey.getKeyInfo().getMetadata(), Cipher.ENCRYPT_MODE);
if (gk != null) {
- return new OzoneOutputStream(
- new CipherOutputStream(keyOutputStream, gk.getCipher()),
- keyOutputStream);
+ return new OzoneOutputStream(new CipherOutputStream(
+ keyOutputStream, gk.getCipher()), syncable);
}
} catch (Exception ex) {
throw new IOException(ex);
}
-
- return new OzoneOutputStream(keyOutputStream);
+ return null;
}
}
- private KeyOutputStream.Builder createKeyOutputStream(OpenKeySession openKey,
- String requestId) {
+ private KeyOutputStream.Builder createKeyOutputStream(
+ OpenKeySession openKey) {
KeyOutputStream.Builder builder;
ReplicationConfig replicationConfig =
@@ -2216,7 +2217,6 @@ public class RpcClient implements ClientProtocol {
return builder.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
- .setRequestID(requestId)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(conf.getObject(OzoneClientConfig.class))
.setClientMetrics(clientMetrics);
diff --git a/hadoop-ozone/dist/src/main/smoketest/gdpr/gdpr.robot
b/hadoop-ozone/dist/src/main/smoketest/gdpr/gdpr.robot
index 1078d95540..366010e031 100644
--- a/hadoop-ozone/dist/src/main/smoketest/gdpr/gdpr.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/gdpr/gdpr.robot
@@ -66,6 +66,11 @@ Test GDPR with --enforcegdpr=true
${result} = Execute ozone sh key info
/${volume}/mybucket2/mykey | jq -r '. | select(.name=="mykey") | .metadata |
.gdprEnabled'
Should Be Equal ${result} true
Execute ozone sh key delete
/${volume}/mybucket2/mykey
+ Execute ozone sh key put --stream
/${volume}/mybucket2/myStreamKey /opt/hadoop/NOTICE.txt
+ Execute rm -f NOTICE.txt.1
+ ${result} = Execute ozone sh key info
/${volume}/mybucket2/myStreamKey | jq -r '. | select(.name=="myStreamKey") |
.metadata | .gdprEnabled'
+ Should Be Equal ${result} true
+ Execute ozone sh key delete
/${volume}/mybucket2/myStreamKey
Test GDPR with -g=true
[arguments] ${volume}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index 8d8667051f..bbf459f426 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.hdds.scm.storage.MultipartInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -210,6 +211,7 @@ public class TestOzoneAtRestEncryption {
OzoneBucket bucket = volume.getBucket(bucketName);
createAndVerifyKeyData(bucket);
+ createAndVerifyStreamKeyData(bucket);
}
@Test
@@ -227,27 +229,45 @@ public class TestOzoneAtRestEncryption {
linkVolumeName, linkBucketName);
createAndVerifyKeyData(linkBucket);
+ createAndVerifyStreamKeyData(linkBucket);
}
+ static void createAndVerifyStreamKeyData(OzoneBucket bucket)
+ throws Exception {
+ Instant testStartTime = Instant.now();
+ String keyName = UUID.randomUUID().toString();
+ String value = "sample value";
+ try (OzoneDataStreamOutput out = bucket.createStreamKey(keyName,
+ value.getBytes(StandardCharsets.UTF_8).length,
+ ReplicationConfig.fromTypeAndFactor(RATIS, ONE),
+ new HashMap<>())) {
+ out.write(value.getBytes(StandardCharsets.UTF_8));
+ }
+ verifyKeyData(bucket, keyName, value, testStartTime);
+ }
- private void createAndVerifyKeyData(OzoneBucket bucket) throws Exception {
+ static void createAndVerifyKeyData(OzoneBucket bucket) throws Exception {
Instant testStartTime = Instant.now();
String keyName = UUID.randomUUID().toString();
String value = "sample value";
try (OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes(StandardCharsets.UTF_8).length,
- ReplicationType.RATIS,
- ReplicationFactor.ONE, new HashMap<>())) {
+ ReplicationConfig.fromTypeAndFactor(RATIS, ONE),
+ new HashMap<>())) {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
+ verifyKeyData(bucket, keyName, value, testStartTime);
+ }
+ static void verifyKeyData(OzoneBucket bucket, String keyName, String value,
+ Instant testStartTime) throws Exception {
// Verify content.
OzoneKeyDetails key = bucket.getKey(keyName);
Assert.assertEquals(keyName, key.getName());
// Check file encryption info is set,
// if set key will use this encryption info and encrypt data.
- Assert.assertTrue(key.getFileEncryptionInfo() != null);
+ Assert.assertNotNull(key.getFileEncryptionInfo());
byte[] fileContent;
int len = 0;
@@ -324,8 +344,8 @@ public class TestOzoneAtRestEncryption {
keyMetadata.put(OzoneConsts.GDPR_FLAG, "true");
try (OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes(StandardCharsets.UTF_8).length,
- ReplicationType.RATIS,
- ReplicationFactor.ONE, keyMetadata)) {
+ ReplicationConfig.fromTypeAndFactor(RATIS, ONE),
+ keyMetadata)) {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
@@ -364,6 +384,7 @@ public class TestOzoneAtRestEncryption {
}, 500, 100000);
RepeatedOmKeyInfo deletedKeys =
getMatchedKeyInfo(keyName, omMetadataManager);
+ Assert.assertNotNull(deletedKeys);
Map<String, String> deletedKeyMetadata =
deletedKeys.getOmKeyInfoList().get(0).getMetadata();
Assert.assertFalse(deletedKeyMetadata.containsKey(OzoneConsts.GDPR_FLAG));
@@ -374,7 +395,7 @@ public class TestOzoneAtRestEncryption {
deletedKeys.getOmKeyInfoList().get(0).getFileEncryptionInfo());
}
- private boolean verifyRatisReplication(String volumeName, String bucketName,
+ static boolean verifyRatisReplication(String volumeName, String bucketName,
String keyName, ReplicationType type, ReplicationFactor factor)
throws IOException {
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
@@ -461,8 +482,8 @@ public class TestOzoneAtRestEncryption {
String keyName = "mpu_test_key_" + numParts;
// Initiate multipart upload
- String uploadID = initiateMultipartUpload(bucket, keyName, RATIS,
- ONE);
+ String uploadID = initiateMultipartUpload(bucket, keyName,
+ ReplicationConfig.fromTypeAndFactor(RATIS, ONE));
// Upload Parts
Map<Integer, String> partsMap = new TreeMap<>();
@@ -548,10 +569,10 @@ public class TestOzoneAtRestEncryption {
}
private String initiateMultipartUpload(OzoneBucket bucket, String keyName,
- ReplicationType replicationType, ReplicationFactor replicationFactor)
+ ReplicationConfig replicationConfig)
throws Exception {
OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
- replicationType, replicationFactor);
+ replicationConfig);
String uploadID = multipartInfo.getUploadID();
Assert.assertNotNull(uploadID);
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
index 0d44f75d66..dcb917f2f9 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSDataStreamOutput.java
@@ -18,16 +18,15 @@
package org.apache.hadoop.fs.ozone;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+import org.apache.hadoop.ozone.client.io.ByteBufferOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* The ByteBuffer output stream for Ozone file system.
*/
-public class OzoneFSDataStreamOutput extends OutputStream
- implements ByteBufferStreamOutput {
+public class OzoneFSDataStreamOutput extends ByteBufferOutputStream {
private final ByteBufferStreamOutput byteBufferStreamOutput;
@@ -50,34 +49,6 @@ public class OzoneFSDataStreamOutput extends OutputStream
byteBufferStreamOutput.write(b, off, len);
}
- @Override
- public void write(byte[] b, int off, int len)
- throws IOException {
- write(ByteBuffer.wrap(b));
- }
-
- /**
- * Writes the specified byte to this output stream. The general
- * contract for <code>write</code> is that one byte is written
- * to the output stream. The byte to be written is the eight
- * low-order bits of the argument <code>b</code>. The 24
- * high-order bits of <code>b</code> are ignored.
- * <p>
- * Subclasses of <code>OutputStream</code> must provide an
- * implementation for this method.
- *
- * @param b the <code>byte</code>.
- * @throws IOException if an I/O error occurs. In particular,
- * an <code>IOException</code> may be thrown if the
- * output stream has been closed.
- */
- @Override
- public void write(int b) throws IOException {
- byte[] singleBytes = new byte[1];
- singleBytes[0] = (byte) b;
- byteBufferStreamOutput.write(ByteBuffer.wrap(singleBytes));
- }
-
/**
* Flushes this DataStream output and forces any buffered output bytes
* to be written out.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]