This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new eb7e8d009e HDDS-9668. Zero-Copy in EC GRPC (write) (#5621)
eb7e8d009e is described below
commit eb7e8d009e701d1dc7a792deca1b5d6893d051c0
Author: Duong Nguyen <[email protected]>
AuthorDate: Mon Dec 4 01:10:39 2023 -0800
HDDS-9668. Zero-Copy in EC GRPC (write) (#5621)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 4 +
.../common/src/main/resources/ozone-default.xml | 8 +
.../transport/server/GrpcXceiverService.java | 74 +++-
.../common/transport/server/XceiverServerGrpc.java | 11 +-
...eam.java => AbstractTestECKeyOutputStream.java} | 12 +-
.../ozone/client/rpc/TestECKeyOutputStream.java | 432 +--------------------
.../rpc/TestECKeyOutputStreamWithZeroCopy.java | 31 ++
7 files changed, 134 insertions(+), 438 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 0b62b887a3..89c9be5467 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -188,6 +188,10 @@ public final class OzoneConfigKeys {
"ozone.client.ec.grpc.write.timeout";
public static final String OZONE_CLIENT_EC_GRPC_WRITE_TIMEOUT_DEFAULT =
"30s";
+ public static final String OZONE_EC_GRPC_ZERO_COPY_ENABLED =
+ "ozone.ec.grpc.zerocopy.enabled";
+ public static final boolean OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT = true;
+
/**
* Ozone administrator users delimited by comma.
* If not set, only the user who launches an ozone service will be the
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 65019796b8..7d8f538178 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4248,4 +4248,12 @@
to existing buckets till this operation is completed.
</description>
</property>
+ <property>
+ <name>ozone.ec.grpc.zerocopy.enabled</name>
+ <value>true</value>
+ <tag>OZONE, DATANODE</tag>
+ <description>
+ Specify if zero-copy should be enabled for EC GRPC protocol.
+ </description>
+ </property>
</configuration>
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
index 023786fdb0..9c3f29d0f0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -17,19 +17,25 @@
*/
package org.apache.hadoop.ozone.container.common.transport.server;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto
- .XceiverClientProtocolServiceGrpc;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
+import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
+import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
+import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler;
+import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.getSendMethod;
+
/**
* Grpc Service for handling Container Commands on datanode.
*/
@@ -39,9 +45,58 @@ public class GrpcXceiverService extends
LOG = LoggerFactory.getLogger(GrpcXceiverService.class);
private final ContainerDispatcher dispatcher;
+ private final boolean zeroCopyEnabled;
+ private final ZeroCopyMessageMarshaller<ContainerCommandRequestProto>
+ zeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
+ ContainerCommandRequestProto.getDefaultInstance());
- public GrpcXceiverService(ContainerDispatcher dispatcher) {
+ public GrpcXceiverService(ContainerDispatcher dispatcher,
+ boolean zeroCopyEnabled) {
this.dispatcher = dispatcher;
+ this.zeroCopyEnabled = zeroCopyEnabled;
+ }
+
+ /**
+ * Bind service with zerocopy marshaller equipped for the `send` API if
+ * zerocopy is enabled.
+ * @return service definition.
+ */
+ public ServerServiceDefinition bindServiceWithZeroCopy() {
+ ServerServiceDefinition orig = super.bindService();
+ if (!zeroCopyEnabled) {
+ LOG.info("Zerocopy is not enabled.");
+ return orig;
+ }
+
+ ServerServiceDefinition.Builder builder =
+ ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());
+ // Add `send` method with zerocopy marshaller.
+ addZeroCopyMethod(orig, builder, getSendMethod(),
+ zeroCopyMessageMarshaller);
+ // Add other methods as is.
+ orig.getMethods().stream().filter(
+ x -> !x.getMethodDescriptor().getFullMethodName().equals(
+ getSendMethod().getFullMethodName())
+ ).forEach(
+ builder::addMethod
+ );
+
+ return builder.build();
+ }
+
+ private static <Req extends MessageLite, Resp> void addZeroCopyMethod(
+ ServerServiceDefinition orig,
+ ServerServiceDefinition.Builder newServiceBuilder,
+ MethodDescriptor<Req, Resp> origMethod,
+ ZeroCopyMessageMarshaller<Req> zeroCopyMarshaller) {
+ MethodDescriptor<Req, Resp> newMethod = origMethod.toBuilder()
+ .setRequestMarshaller(zeroCopyMarshaller)
+ .build();
+ @SuppressWarnings("unchecked")
+ ServerCallHandler<Req, Resp> serverCallHandler =
+ (ServerCallHandler<Req, Resp>) orig.getMethod(
+ newMethod.getFullMethodName()).getServerCallHandler();
+ newServiceBuilder.addMethod(newMethod, serverCallHandler);
}
@Override
@@ -61,6 +116,11 @@ public class GrpcXceiverService extends
+ " ContainerCommandRequestProto {}", request, e);
isClosed.set(true);
responseObserver.onError(e);
+ } finally {
+ InputStream popStream = zeroCopyMessageMarshaller.popStream(request);
+ if (popStream != null) {
+ IOUtils.close(LOG, popStream);
+ }
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index b421177b44..009e6396e0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -66,6 +66,9 @@ import
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT;
+
/**
* Creates a Grpc server endpoint that acts as the communication layer for
* Ozone containers.
@@ -131,8 +134,13 @@ public final class XceiverServerGrpc implements
XceiverServerSpi {
eventLoopGroup = new NioEventLoopGroup(poolSize / 10, factory);
channelType = NioServerSocketChannel.class;
}
+ final boolean zeroCopyEnabled = conf.getBoolean(
+ OZONE_EC_GRPC_ZERO_COPY_ENABLED,
+ OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT);
LOG.info("GrpcServer channel type {}", channelType.getSimpleName());
+ GrpcXceiverService xceiverService = new GrpcXceiverService(dispatcher,
+ zeroCopyEnabled);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.bossEventLoopGroup(eventLoopGroup)
@@ -140,7 +148,8 @@ public final class XceiverServerGrpc implements
XceiverServerSpi {
.channelType(channelType)
.executor(readExecutors)
.addService(ServerInterceptors.intercept(
- new GrpcXceiverService(dispatcher), new GrpcServerInterceptor()));
+ xceiverService.bindServiceWithZeroCopy(),
+ new GrpcServerInterceptor()));
SecurityConfig secConf = new SecurityConfig(conf);
if (secConf.isSecurityEnabled() && secConf.isGrpcTlsEnabled()) {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
similarity index 98%
copy from
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
copy to
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
index a429b94d5a..518893aa0a 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
@@ -66,7 +66,7 @@ import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTER
/**
* Tests key output stream.
*/
-public class TestECKeyOutputStream {
+abstract class AbstractTestECKeyOutputStream {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf = new OzoneConfiguration();
private static OzoneClient client;
@@ -85,8 +85,7 @@ public class TestECKeyOutputStream {
/**
* Create a MiniDFSCluster for testing.
*/
- @BeforeClass
- public static void init() throws Exception {
+ protected static void init(boolean zeroCopyEnabled) throws Exception {
chunkSize = 1024 * 1024;
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
@@ -115,6 +114,8 @@ public class TestECKeyOutputStream {
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
TimeUnit.SECONDS);
+ conf.setBoolean(OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED,
+ zeroCopyEnabled);
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
.setTotalPipelineNumLimit(10).setBlockSize(blockSize)
.setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
@@ -131,6 +132,11 @@ public class TestECKeyOutputStream {
initInputChunks();
}
+ @BeforeClass
+ public static void init() throws Exception {
+ init(false);
+ }
+
/**
* Shutdown MiniDFSCluster.
*/
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index a429b94d5a..dc5622e1e8 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -14,440 +14,18 @@
* License for the specific language governing permissions and limitations
under
* the License.
*/
+
package org.apache.hadoop.ozone.client.rpc;
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.utils.IOUtils;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.BucketArgs;
-import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneKey;
-import org.apache.hadoop.ozone.client.OzoneKeyDetails;
-import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
-import org.apache.hadoop.ozone.client.io.KeyOutputStream;
-import org.apache.hadoop.ozone.client.io.OzoneInputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.TestHelper;
-import org.apache.ozone.test.GenericTestUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
- * Tests key output stream.
+ * Tests key output stream without zero-copy enabled.
*/
-public class TestECKeyOutputStream {
- private static MiniOzoneCluster cluster;
- private static OzoneConfiguration conf = new OzoneConfiguration();
- private static OzoneClient client;
- private static ObjectStore objectStore;
- private static int chunkSize;
- private static int flushSize;
- private static int maxFlushSize;
- private static int blockSize;
- private static String volumeName;
- private static String bucketName;
- private static String keyString;
- private static int dataBlocks = 3;
- private static int inputSize = dataBlocks * chunkSize;
- private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
-
- /**
- * Create a MiniDFSCluster for testing.
- */
+public class TestECKeyOutputStream extends
+ AbstractTestECKeyOutputStream {
@BeforeClass
public static void init() throws Exception {
- chunkSize = 1024 * 1024;
- flushSize = 2 * chunkSize;
- maxFlushSize = 2 * flushSize;
- blockSize = 2 * maxFlushSize;
-
- OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
- clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
- clientConfig.setStreamBufferFlushDelay(false);
- conf.setFromObject(clientConfig);
-
- // If SCM detects dead node too quickly, then container would be moved to
- // closed state and all in progress writes will get exception. To avoid
- // that, we are just keeping higher timeout and none of the tests depending
- // on deadnode detection timeout currently.
- conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
- conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
- conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
- TimeUnit.SECONDS);
- conf.setTimeDuration(
- "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
- TimeUnit.SECONDS);
- conf.setQuietMode(false);
- conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
- StorageUnit.MB);
- conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
- TimeUnit.MILLISECONDS);
- conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
- TimeUnit.SECONDS);
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
- .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
- .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
- .setStreamBufferMaxSize(maxFlushSize)
- .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
- cluster.waitForClusterToBeReady();
- client = OzoneClientFactory.getRpcClient(conf);
- objectStore = client.getObjectStore();
- keyString = UUID.randomUUID().toString();
- volumeName = "testeckeyoutputstream";
- bucketName = volumeName;
- objectStore.createVolume(volumeName);
- objectStore.getVolume(volumeName).createBucket(bucketName);
- initInputChunks();
- }
-
- /**
- * Shutdown MiniDFSCluster.
- */
- @AfterClass
- public static void shutdown() {
- IOUtils.closeQuietly(client);
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- @Test
- public void testCreateKeyWithECReplicationConfig() throws Exception {
- try (OzoneOutputStream key = TestHelper
- .createKey(keyString, new ECReplicationConfig(3, 2,
- ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
- objectStore, volumeName, bucketName)) {
- Assert.assertTrue(key.getOutputStream() instanceof ECKeyOutputStream);
- }
- }
-
- @Test
- public void testCreateKeyWithOutBucketDefaults() throws Exception {
- OzoneVolume volume = objectStore.getVolume(volumeName);
- OzoneBucket bucket = volume.getBucket(bucketName);
- try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
- Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
- for (int i = 0; i < inputChunks.length; i++) {
- out.write(inputChunks[i]);
- }
- }
- }
-
- @Test
- public void testCreateKeyWithBucketDefaults() throws Exception {
- String myBucket = UUID.randomUUID().toString();
- OzoneVolume volume = objectStore.getVolume(volumeName);
- final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
- bucketArgs.setDefaultReplicationConfig(
- new DefaultReplicationConfig(
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize)));
-
- volume.createBucket(myBucket, bucketArgs.build());
- OzoneBucket bucket = volume.getBucket(myBucket);
-
- try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
- Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
- for (int i = 0; i < inputChunks.length; i++) {
- out.write(inputChunks[i]);
- }
- }
- byte[] buf = new byte[chunkSize];
- try (OzoneInputStream in = bucket.readKey(keyString)) {
- for (int i = 0; i < inputChunks.length; i++) {
- int read = in.read(buf, 0, chunkSize);
- Assert.assertEquals(chunkSize, read);
- Assert.assertTrue(Arrays.equals(buf, inputChunks[i]));
- }
- }
- }
-
- @Test
- public void testOverwriteECKeyWithRatisKey() throws Exception {
- String myBucket = UUID.randomUUID().toString();
- OzoneVolume volume = objectStore.getVolume(volumeName);
- final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
- volume.createBucket(myBucket, bucketArgs.build());
- OzoneBucket bucket = volume.getBucket(myBucket);
- createKeyAndCheckReplicationConfig(keyString, bucket,
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize));
-
- //Overwrite with RATIS/THREE
- createKeyAndCheckReplicationConfig(keyString, bucket,
-
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
-
- //Overwrite with RATIS/ONE
- createKeyAndCheckReplicationConfig(keyString, bucket,
- RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE));
- }
-
- @Test
- public void testOverwriteRatisKeyWithECKey() throws Exception {
- String myBucket = UUID.randomUUID().toString();
- OzoneVolume volume = objectStore.getVolume(volumeName);
- final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
- volume.createBucket(myBucket, bucketArgs.build());
- OzoneBucket bucket = volume.getBucket(myBucket);
-
- createKeyAndCheckReplicationConfig(keyString, bucket,
-
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
- // Overwrite with EC key
- createKeyAndCheckReplicationConfig(keyString, bucket,
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize));
- }
-
- private void createKeyAndCheckReplicationConfig(String keyName,
- OzoneBucket bucket, ReplicationConfig replicationConfig)
- throws IOException {
- try (OzoneOutputStream out = bucket
- .createKey(keyName, inputSize, replicationConfig, new HashMap<>())) {
- for (int i = 0; i < inputChunks.length; i++) {
- out.write(inputChunks[i]);
- }
- }
- OzoneKeyDetails key = bucket.getKey(keyName);
- Assert.assertEquals(replicationConfig, key.getReplicationConfig());
- }
-
- @Test
- public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
- OzoneBucket bucket = getOzoneBucket();
- try (OzoneOutputStream out = bucket.createKey(
- "testCreateRatisKeyAndWithECBucketDefaults", 2000,
- RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
- new HashMap<>())) {
- Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
- for (int i = 0; i < inputChunks.length; i++) {
- out.write(inputChunks[i]);
- }
- }
- }
-
- @Test
- public void test13ChunksInSingleWriteOp() throws IOException {
- testMultipleChunksInSingleWriteOp(13);
- }
-
- @Test
- public void testChunksInSingleWriteOpWithOffset() throws IOException {
- testMultipleChunksInSingleWriteOp(11, 25, 19);
- }
-
- @Test
- public void test15ChunksInSingleWriteOp() throws IOException {
- testMultipleChunksInSingleWriteOp(15);
- }
-
- @Test
- public void test20ChunksInSingleWriteOp() throws IOException {
- testMultipleChunksInSingleWriteOp(20);
+ init(false);
}
-
- @Test
- public void test21ChunksInSingleWriteOp() throws IOException {
- testMultipleChunksInSingleWriteOp(21);
- }
-
- private void testMultipleChunksInSingleWriteOp(int offset,
- int bufferChunks, int
numChunks)
- throws IOException {
- byte[] inputData = getInputBytes(offset, bufferChunks, numChunks);
- final OzoneBucket bucket = getOzoneBucket();
- String keyName =
- String.format("testMultipleChunksInSingleWriteOpOffset" +
- "%dBufferChunks%dNumChunks", offset, bufferChunks,
- numChunks);
- try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize), new HashMap<>())) {
- out.write(inputData, offset, numChunks * chunkSize);
- }
-
- validateContent(offset, numChunks * chunkSize, inputData, bucket,
- bucket.getKey(keyName));
- }
-
- private void testMultipleChunksInSingleWriteOp(int numChunks)
- throws IOException {
- testMultipleChunksInSingleWriteOp(0, numChunks, numChunks);
- }
-
- @Test
- public void testECContainerKeysCountAndNumContainerReplicas()
- throws IOException, InterruptedException, TimeoutException {
- byte[] inputData = getInputBytes(1);
- final OzoneBucket bucket = getOzoneBucket();
- ContainerOperationClient containerOperationClient =
- new ContainerOperationClient(conf);
-
- ECReplicationConfig repConfig = new ECReplicationConfig(
- 3, 2, ECReplicationConfig.EcCodec.RS, chunkSize);
- // Close all EC pipelines so we must get a fresh pipeline and hence
- // container for this test.
- PipelineManager pm =
- cluster.getStorageContainerManager().getPipelineManager();
- for (Pipeline p : pm.getPipelines(repConfig)) {
- pm.closePipeline(p, true);
- }
-
- String keyName = UUID.randomUUID().toString();
- try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
- repConfig, new HashMap<>())) {
- out.write(inputData);
- }
- OzoneKeyDetails key = bucket.getKey(keyName);
- long currentKeyContainerID =
- key.getOzoneKeyLocations().get(0).getContainerID();
-
- GenericTestUtils.waitFor(() -> {
- try {
- return (containerOperationClient.getContainer(currentKeyContainerID)
- .getNumberOfKeys() == 1) && (containerOperationClient
- .getContainerReplicas(currentKeyContainerID).size() == 5);
- } catch (IOException exception) {
- Assert.fail("Unexpected exception " + exception);
- return false;
- }
- }, 100, 10000);
- validateContent(inputData, bucket, key);
- }
-
- private void validateContent(byte[] inputData, OzoneBucket bucket,
- OzoneKey key) throws IOException {
- validateContent(0, inputData.length, inputData, bucket, key);
- }
-
- private void validateContent(int offset, int length, byte[] inputData,
- OzoneBucket bucket,
- OzoneKey key) throws IOException {
- try (OzoneInputStream is = bucket.readKey(key.getName())) {
- byte[] fileContent = new byte[length];
- Assert.assertEquals(length, is.read(fileContent));
- Assert.assertEquals(new String(Arrays.copyOfRange(inputData, offset,
- offset + length), UTF_8),
- new String(fileContent, UTF_8));
- }
- }
-
- private OzoneBucket getOzoneBucket() throws IOException {
- String myBucket = UUID.randomUUID().toString();
- OzoneVolume volume = objectStore.getVolume(volumeName);
- final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
- bucketArgs.setDefaultReplicationConfig(
- new DefaultReplicationConfig(
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize)));
-
- volume.createBucket(myBucket, bucketArgs.build());
- return volume.getBucket(myBucket);
- }
-
- private static void initInputChunks() {
- for (int i = 0; i < dataBlocks; i++) {
- inputChunks[i] = getBytesWith(i + 1, chunkSize);
- }
- }
-
- private static byte[] getBytesWith(int singleDigitNumber, int total) {
- StringBuilder builder = new StringBuilder(singleDigitNumber);
- for (int i = 1; i <= total; i++) {
- builder.append(singleDigitNumber);
- }
- return builder.toString().getBytes(UTF_8);
- }
-
- @Test
- public void testWriteShouldSucceedWhenDNKilled() throws Exception {
- int numChunks = 3;
- byte[] inputData = getInputBytes(numChunks);
- final OzoneBucket bucket = getOzoneBucket();
- String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
- DatanodeDetails nodeToKill = null;
- try {
- try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize), new HashMap<>())) {
- ECKeyOutputStream ecOut = (ECKeyOutputStream) out.getOutputStream();
- out.write(inputData);
- // Kill a node from first pipeline
- nodeToKill = ecOut.getStreamEntries()
- .get(0).getPipeline().getFirstNode();
- cluster.shutdownHddsDatanode(nodeToKill);
-
- out.write(inputData);
-
- // Wait for flushing thread to finish its work.
- final long checkpoint = System.currentTimeMillis();
- ecOut.insertFlushCheckpoint(checkpoint);
- GenericTestUtils.waitFor(() -> ecOut.getFlushCheckpoint() ==
checkpoint,
- 100, 10000);
-
- // Check the second blockGroup pipeline to make sure that the failed
- // node is not selected.
- Assert.assertFalse(ecOut.getStreamEntries()
- .get(1).getPipeline().getNodes().contains(nodeToKill));
- }
-
- try (OzoneInputStream is = bucket.readKey(keyName)) {
- // We wrote "inputData" twice, so do two reads and ensure the correct
- // data comes back.
- for (int i = 0; i < 2; i++) {
- byte[] fileContent = new byte[inputData.length];
- Assert.assertEquals(inputData.length, is.read(fileContent));
- Assert.assertEquals(new String(inputData, UTF_8),
- new String(fileContent, UTF_8));
- }
- }
- } finally {
- cluster.restartHddsDatanode(nodeToKill, true);
- }
- }
-
- private byte[] getInputBytes(int numChunks) {
- return getInputBytes(0, numChunks, numChunks);
- }
-
- private byte[] getInputBytes(int offset, int bufferChunks, int numChunks) {
- byte[] inputData = new byte[offset + bufferChunks * chunkSize];
- for (int i = 0; i < numChunks; i++) {
- int start = offset + (i * chunkSize);
- Arrays.fill(inputData, start, start + chunkSize - 1,
- String.valueOf(i % 9).getBytes(UTF_8)[0]);
- }
- return inputData;
- }
-
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
new file mode 100644
index 0000000000..b9baeb2437
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.junit.BeforeClass;
+
+/**
+ * Tests key output stream with zero-copy enabled.
+ */
+public class TestECKeyOutputStreamWithZeroCopy extends
+ AbstractTestECKeyOutputStream {
+ @BeforeClass
+ public static void init() throws Exception {
+ init(true);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]