This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new eb7e8d009e HDDS-9668. Zero-Copy in EC GRPC (write) (#5621)
eb7e8d009e is described below

commit eb7e8d009e701d1dc7a792deca1b5d6893d051c0
Author: Duong Nguyen <[email protected]>
AuthorDate: Mon Dec 4 01:10:39 2023 -0800

    HDDS-9668. Zero-Copy in EC GRPC (write) (#5621)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   4 +
 .../common/src/main/resources/ozone-default.xml    |   8 +
 .../transport/server/GrpcXceiverService.java       |  74 +++-
 .../common/transport/server/XceiverServerGrpc.java |  11 +-
 ...eam.java => AbstractTestECKeyOutputStream.java} |  12 +-
 .../ozone/client/rpc/TestECKeyOutputStream.java    | 432 +--------------------
 .../rpc/TestECKeyOutputStreamWithZeroCopy.java     |  31 ++
 7 files changed, 134 insertions(+), 438 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 0b62b887a3..89c9be5467 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -188,6 +188,10 @@ public final class OzoneConfigKeys {
       "ozone.client.ec.grpc.write.timeout";
   public static final String OZONE_CLIENT_EC_GRPC_WRITE_TIMEOUT_DEFAULT = 
"30s";
 
+  public static final String OZONE_EC_GRPC_ZERO_COPY_ENABLED =
+      "ozone.ec.grpc.zerocopy.enabled";
+  public static final boolean OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT = true;
+
   /**
    * Ozone administrator users delimited by comma.
    * If not set, only the user who launches an ozone service will be the
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 65019796b8..7d8f538178 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4248,4 +4248,12 @@
       to existing buckets till this operation is completed.
     </description>
   </property>
+  <property>
+    <name>ozone.ec.grpc.zerocopy.enabled</name>
+    <value>true</value>
+    <tag>OZONE, DATANODE</tag>
+    <description>
+      Specify if zero-copy should be enabled for EC GRPC protocol.
+    </description>
+  </property>
 </configuration>
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
index 023786fdb0..9c3f29d0f0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -17,19 +17,25 @@
  */
 
 package org.apache.hadoop.ozone.container.common.transport.server;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto
-    .XceiverClientProtocolServiceGrpc;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
+import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
+import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
+import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler;
+import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.InputStream;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.getSendMethod;
+
 /**
  * Grpc Service for handling Container Commands on datanode.
  */
@@ -39,9 +45,58 @@ public class GrpcXceiverService extends
       LOG = LoggerFactory.getLogger(GrpcXceiverService.class);
 
   private final ContainerDispatcher dispatcher;
+  private final boolean zeroCopyEnabled;
+  private final ZeroCopyMessageMarshaller<ContainerCommandRequestProto>
+      zeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
+          ContainerCommandRequestProto.getDefaultInstance());
 
-  public GrpcXceiverService(ContainerDispatcher dispatcher) {
+  public GrpcXceiverService(ContainerDispatcher dispatcher,
+      boolean zeroCopyEnabled) {
     this.dispatcher = dispatcher;
+    this.zeroCopyEnabled = zeroCopyEnabled;
+  }
+
+  /**
+   * Bind service with zerocopy marshaller equipped for the `send` API if
+   * zerocopy is enabled.
+   * @return  service definition.
+   */
+  public ServerServiceDefinition bindServiceWithZeroCopy() {
+    ServerServiceDefinition orig = super.bindService();
+    if (!zeroCopyEnabled) {
+      LOG.info("Zerocopy is not enabled.");
+      return orig;
+    }
+
+    ServerServiceDefinition.Builder builder =
+        ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());
+    // Add `send` method with zerocopy marshaller.
+    addZeroCopyMethod(orig, builder, getSendMethod(),
+        zeroCopyMessageMarshaller);
+    // Add other methods as is.
+    orig.getMethods().stream().filter(
+        x -> !x.getMethodDescriptor().getFullMethodName().equals(
+            getSendMethod().getFullMethodName())
+    ).forEach(
+        builder::addMethod
+    );
+
+    return builder.build();
+  }
+
+  private static <Req extends MessageLite, Resp> void addZeroCopyMethod(
+      ServerServiceDefinition orig,
+      ServerServiceDefinition.Builder newServiceBuilder,
+      MethodDescriptor<Req, Resp> origMethod,
+      ZeroCopyMessageMarshaller<Req> zeroCopyMarshaller) {
+    MethodDescriptor<Req, Resp> newMethod = origMethod.toBuilder()
+        .setRequestMarshaller(zeroCopyMarshaller)
+        .build();
+    @SuppressWarnings("unchecked")
+    ServerCallHandler<Req, Resp> serverCallHandler =
+        (ServerCallHandler<Req, Resp>) orig.getMethod(
+            newMethod.getFullMethodName()).getServerCallHandler();
+    newServiceBuilder.addMethod(newMethod, serverCallHandler);
   }
 
   @Override
@@ -61,6 +116,11 @@ public class GrpcXceiverService extends
                     + " ContainerCommandRequestProto {}", request, e);
           isClosed.set(true);
           responseObserver.onError(e);
+        } finally {
+          InputStream popStream = zeroCopyMessageMarshaller.popStream(request);
+          if (popStream != null) {
+            IOUtils.close(LOG, popStream);
+          }
         }
       }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index b421177b44..009e6396e0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -66,6 +66,9 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT;
+
 /**
  * Creates a Grpc server endpoint that acts as the communication layer for
  * Ozone containers.
@@ -131,8 +134,13 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
       eventLoopGroup = new NioEventLoopGroup(poolSize / 10, factory);
       channelType = NioServerSocketChannel.class;
     }
+    final boolean zeroCopyEnabled = conf.getBoolean(
+        OZONE_EC_GRPC_ZERO_COPY_ENABLED,
+        OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT);
 
     LOG.info("GrpcServer channel type {}", channelType.getSimpleName());
+    GrpcXceiverService xceiverService = new GrpcXceiverService(dispatcher,
+        zeroCopyEnabled);
     NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
         .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
         .bossEventLoopGroup(eventLoopGroup)
@@ -140,7 +148,8 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
         .channelType(channelType)
         .executor(readExecutors)
         .addService(ServerInterceptors.intercept(
-            new GrpcXceiverService(dispatcher), new GrpcServerInterceptor()));
+            xceiverService.bindServiceWithZeroCopy(),
+            new GrpcServerInterceptor()));
 
     SecurityConfig secConf = new SecurityConfig(conf);
     if (secConf.isSecurityEnabled() && secConf.isGrpcTlsEnabled()) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
similarity index 98%
copy from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
copy to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
index a429b94d5a..518893aa0a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
@@ -66,7 +66,7 @@ import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTER
 /**
  * Tests key output stream.
  */
-public class TestECKeyOutputStream {
+abstract class AbstractTestECKeyOutputStream {
   private static MiniOzoneCluster cluster;
   private static OzoneConfiguration conf = new OzoneConfiguration();
   private static OzoneClient client;
@@ -85,8 +85,7 @@ public class TestECKeyOutputStream {
   /**
    * Create a MiniDFSCluster for testing.
    */
-  @BeforeClass
-  public static void init() throws Exception {
+  protected static void init(boolean zeroCopyEnabled) throws Exception {
     chunkSize = 1024 * 1024;
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
@@ -115,6 +114,8 @@ public class TestECKeyOutputStream {
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
         TimeUnit.SECONDS);
+    conf.setBoolean(OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED,
+        zeroCopyEnabled);
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
         .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
         .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
@@ -131,6 +132,11 @@ public class TestECKeyOutputStream {
     initInputChunks();
   }
 
+  @BeforeClass
+  public static void init() throws Exception {
+    init(false);
+  }
+
   /**
    * Shutdown MiniDFSCluster.
    */
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index a429b94d5a..dc5622e1e8 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -14,440 +14,18 @@
  * License for the specific language governing permissions and limitations 
under
  * the License.
  */
+
 package org.apache.hadoop.ozone.client.rpc;
 
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.utils.IOUtils;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.BucketArgs;
-import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneKey;
-import org.apache.hadoop.ozone.client.OzoneKeyDetails;
-import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
-import org.apache.hadoop.ozone.client.io.KeyOutputStream;
-import org.apache.hadoop.ozone.client.io.OzoneInputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.TestHelper;
-import org.apache.ozone.test.GenericTestUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 /**
- * Tests key output stream.
+ * Tests key output stream without zero-copy enabled.
  */
-public class TestECKeyOutputStream {
-  private static MiniOzoneCluster cluster;
-  private static OzoneConfiguration conf = new OzoneConfiguration();
-  private static OzoneClient client;
-  private static ObjectStore objectStore;
-  private static int chunkSize;
-  private static int flushSize;
-  private static int maxFlushSize;
-  private static int blockSize;
-  private static String volumeName;
-  private static String bucketName;
-  private static String keyString;
-  private static int dataBlocks = 3;
-  private static int inputSize = dataBlocks * chunkSize;
-  private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   */
+public class TestECKeyOutputStream extends
+    AbstractTestECKeyOutputStream {
   @BeforeClass
   public static void init() throws Exception {
-    chunkSize = 1024 * 1024;
-    flushSize = 2 * chunkSize;
-    maxFlushSize = 2 * flushSize;
-    blockSize = 2 * maxFlushSize;
-
-    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
-    clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
-    clientConfig.setStreamBufferFlushDelay(false);
-    conf.setFromObject(clientConfig);
-
-    // If SCM detects dead node too quickly, then container would be moved to
-    // closed state and all in progress writes will get exception. To avoid
-    // that, we are just keeping higher timeout and none of the tests depending
-    // on deadnode detection timeout currently.
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
-    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
-    conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
-        TimeUnit.SECONDS);
-    conf.setTimeDuration(
-        "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
-        TimeUnit.SECONDS);
-    conf.setQuietMode(false);
-    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
-        StorageUnit.MB);
-    conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
-        TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
-        TimeUnit.SECONDS);
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
-        .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
-        .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
-        .setStreamBufferMaxSize(maxFlushSize)
-        .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
-    cluster.waitForClusterToBeReady();
-    client = OzoneClientFactory.getRpcClient(conf);
-    objectStore = client.getObjectStore();
-    keyString = UUID.randomUUID().toString();
-    volumeName = "testeckeyoutputstream";
-    bucketName = volumeName;
-    objectStore.createVolume(volumeName);
-    objectStore.getVolume(volumeName).createBucket(bucketName);
-    initInputChunks();
-  }
-
-  /**
-   * Shutdown MiniDFSCluster.
-   */
-  @AfterClass
-  public static void shutdown() {
-    IOUtils.closeQuietly(client);
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testCreateKeyWithECReplicationConfig() throws Exception {
-    try (OzoneOutputStream key = TestHelper
-        .createKey(keyString, new ECReplicationConfig(3, 2,
-              ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
-            objectStore, volumeName, bucketName)) {
-      Assert.assertTrue(key.getOutputStream() instanceof ECKeyOutputStream);
-    }
-  }
-
-  @Test
-  public void testCreateKeyWithOutBucketDefaults() throws Exception {
-    OzoneVolume volume = objectStore.getVolume(volumeName);
-    OzoneBucket bucket = volume.getBucket(bucketName);
-    try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
-      Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
-      for (int i = 0; i < inputChunks.length; i++) {
-        out.write(inputChunks[i]);
-      }
-    }
-  }
-
-  @Test
-  public void testCreateKeyWithBucketDefaults() throws Exception {
-    String myBucket = UUID.randomUUID().toString();
-    OzoneVolume volume = objectStore.getVolume(volumeName);
-    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
-    bucketArgs.setDefaultReplicationConfig(
-        new DefaultReplicationConfig(
-            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-                chunkSize)));
-
-    volume.createBucket(myBucket, bucketArgs.build());
-    OzoneBucket bucket = volume.getBucket(myBucket);
-
-    try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
-      Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
-      for (int i = 0; i < inputChunks.length; i++) {
-        out.write(inputChunks[i]);
-      }
-    }
-    byte[] buf = new byte[chunkSize];
-    try (OzoneInputStream in = bucket.readKey(keyString)) {
-      for (int i = 0; i < inputChunks.length; i++) {
-        int read = in.read(buf, 0, chunkSize);
-        Assert.assertEquals(chunkSize, read);
-        Assert.assertTrue(Arrays.equals(buf, inputChunks[i]));
-      }
-    }
-  }
-
-  @Test
-  public void testOverwriteECKeyWithRatisKey() throws Exception {
-    String myBucket = UUID.randomUUID().toString();
-    OzoneVolume volume = objectStore.getVolume(volumeName);
-    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
-    volume.createBucket(myBucket, bucketArgs.build());
-    OzoneBucket bucket = volume.getBucket(myBucket);
-    createKeyAndCheckReplicationConfig(keyString, bucket,
-        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-            chunkSize));
-
-    //Overwrite with RATIS/THREE
-    createKeyAndCheckReplicationConfig(keyString, bucket,
-        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
-
-    //Overwrite with RATIS/ONE
-    createKeyAndCheckReplicationConfig(keyString, bucket,
-        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE));
-  }
-
-  @Test
-  public void testOverwriteRatisKeyWithECKey() throws Exception {
-    String myBucket = UUID.randomUUID().toString();
-    OzoneVolume volume = objectStore.getVolume(volumeName);
-    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
-    volume.createBucket(myBucket, bucketArgs.build());
-    OzoneBucket bucket = volume.getBucket(myBucket);
-
-    createKeyAndCheckReplicationConfig(keyString, bucket,
-        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
-    // Overwrite with EC key
-    createKeyAndCheckReplicationConfig(keyString, bucket,
-        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-            chunkSize));
-  }
-
-  private void createKeyAndCheckReplicationConfig(String keyName,
-      OzoneBucket bucket, ReplicationConfig replicationConfig)
-      throws IOException {
-    try (OzoneOutputStream out = bucket
-        .createKey(keyName, inputSize, replicationConfig, new HashMap<>())) {
-      for (int i = 0; i < inputChunks.length; i++) {
-        out.write(inputChunks[i]);
-      }
-    }
-    OzoneKeyDetails key = bucket.getKey(keyName);
-    Assert.assertEquals(replicationConfig, key.getReplicationConfig());
-  }
-
-  @Test
-  public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
-    OzoneBucket bucket = getOzoneBucket();
-    try (OzoneOutputStream out = bucket.createKey(
-        "testCreateRatisKeyAndWithECBucketDefaults", 2000,
-        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
-        new HashMap<>())) {
-      Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
-      for (int i = 0; i < inputChunks.length; i++) {
-        out.write(inputChunks[i]);
-      }
-    }
-  }
-
-  @Test
-  public void test13ChunksInSingleWriteOp() throws IOException {
-    testMultipleChunksInSingleWriteOp(13);
-  }
-
-  @Test
-  public void testChunksInSingleWriteOpWithOffset() throws IOException {
-    testMultipleChunksInSingleWriteOp(11, 25, 19);
-  }
-
-  @Test
-  public void test15ChunksInSingleWriteOp() throws IOException {
-    testMultipleChunksInSingleWriteOp(15);
-  }
-
-  @Test
-  public void test20ChunksInSingleWriteOp() throws IOException {
-    testMultipleChunksInSingleWriteOp(20);
+    init(false);
   }
-
-  @Test
-  public void test21ChunksInSingleWriteOp() throws IOException {
-    testMultipleChunksInSingleWriteOp(21);
-  }
-
-  private void testMultipleChunksInSingleWriteOp(int offset,
-                                                int bufferChunks, int 
numChunks)
-          throws IOException {
-    byte[] inputData = getInputBytes(offset, bufferChunks, numChunks);
-    final OzoneBucket bucket = getOzoneBucket();
-    String keyName =
-            String.format("testMultipleChunksInSingleWriteOpOffset" +
-                    "%dBufferChunks%dNumChunks", offset, bufferChunks,
-                    numChunks);
-    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
-        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-            chunkSize), new HashMap<>())) {
-      out.write(inputData, offset, numChunks * chunkSize);
-    }
-
-    validateContent(offset, numChunks * chunkSize, inputData, bucket,
-            bucket.getKey(keyName));
-  }
-
-  private void testMultipleChunksInSingleWriteOp(int numChunks)
-      throws IOException {
-    testMultipleChunksInSingleWriteOp(0, numChunks, numChunks);
-  }
-
-  @Test
-  public void testECContainerKeysCountAndNumContainerReplicas()
-      throws IOException, InterruptedException, TimeoutException {
-    byte[] inputData = getInputBytes(1);
-    final OzoneBucket bucket = getOzoneBucket();
-    ContainerOperationClient containerOperationClient =
-        new ContainerOperationClient(conf);
-
-    ECReplicationConfig repConfig = new ECReplicationConfig(
-        3, 2, ECReplicationConfig.EcCodec.RS, chunkSize);
-    // Close all EC pipelines so we must get a fresh pipeline and hence
-    // container for this test.
-    PipelineManager pm =
-        cluster.getStorageContainerManager().getPipelineManager();
-    for (Pipeline p : pm.getPipelines(repConfig)) {
-      pm.closePipeline(p, true);
-    }
-
-    String keyName = UUID.randomUUID().toString();
-    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
-        repConfig, new HashMap<>())) {
-      out.write(inputData);
-    }
-    OzoneKeyDetails key = bucket.getKey(keyName);
-    long currentKeyContainerID =
-        key.getOzoneKeyLocations().get(0).getContainerID();
-
-    GenericTestUtils.waitFor(() -> {
-      try {
-        return (containerOperationClient.getContainer(currentKeyContainerID)
-            .getNumberOfKeys() == 1) && (containerOperationClient
-            .getContainerReplicas(currentKeyContainerID).size() == 5);
-      } catch (IOException exception) {
-        Assert.fail("Unexpected exception " + exception);
-        return false;
-      }
-    }, 100, 10000);
-    validateContent(inputData, bucket, key);
-  }
-
-  private void validateContent(byte[] inputData, OzoneBucket bucket,
-                               OzoneKey key) throws IOException {
-    validateContent(0, inputData.length, inputData, bucket, key);
-  }
-
-  private void validateContent(int offset, int length, byte[] inputData,
-                               OzoneBucket bucket,
-      OzoneKey key) throws IOException {
-    try (OzoneInputStream is = bucket.readKey(key.getName())) {
-      byte[] fileContent = new byte[length];
-      Assert.assertEquals(length, is.read(fileContent));
-      Assert.assertEquals(new String(Arrays.copyOfRange(inputData, offset,
-                      offset + length), UTF_8),
-          new String(fileContent, UTF_8));
-    }
-  }
-
-  private OzoneBucket getOzoneBucket() throws IOException {
-    String myBucket = UUID.randomUUID().toString();
-    OzoneVolume volume = objectStore.getVolume(volumeName);
-    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
-    bucketArgs.setDefaultReplicationConfig(
-        new DefaultReplicationConfig(
-            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-                chunkSize)));
-
-    volume.createBucket(myBucket, bucketArgs.build());
-    return volume.getBucket(myBucket);
-  }
-
-  private static void initInputChunks() {
-    for (int i = 0; i < dataBlocks; i++) {
-      inputChunks[i] = getBytesWith(i + 1, chunkSize);
-    }
-  }
-
-  private static byte[] getBytesWith(int singleDigitNumber, int total) {
-    StringBuilder builder = new StringBuilder(singleDigitNumber);
-    for (int i = 1; i <= total; i++) {
-      builder.append(singleDigitNumber);
-    }
-    return builder.toString().getBytes(UTF_8);
-  }
-
-  @Test
-  public void testWriteShouldSucceedWhenDNKilled() throws Exception {
-    int numChunks = 3;
-    byte[] inputData = getInputBytes(numChunks);
-    final OzoneBucket bucket = getOzoneBucket();
-    String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
-    DatanodeDetails nodeToKill = null;
-    try {
-      try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
-          new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-              chunkSize), new HashMap<>())) {
-        ECKeyOutputStream ecOut = (ECKeyOutputStream) out.getOutputStream();
-        out.write(inputData);
-        // Kill a node from first pipeline
-        nodeToKill = ecOut.getStreamEntries()
-            .get(0).getPipeline().getFirstNode();
-        cluster.shutdownHddsDatanode(nodeToKill);
-
-        out.write(inputData);
-
-        // Wait for flushing thread to finish its work.
-        final long checkpoint = System.currentTimeMillis();
-        ecOut.insertFlushCheckpoint(checkpoint);
-        GenericTestUtils.waitFor(() -> ecOut.getFlushCheckpoint() == 
checkpoint,
-            100, 10000);
-
-        // Check the second blockGroup pipeline to make sure that the failed
-        // node is not selected.
-        Assert.assertFalse(ecOut.getStreamEntries()
-            .get(1).getPipeline().getNodes().contains(nodeToKill));
-      }
-
-      try (OzoneInputStream is = bucket.readKey(keyName)) {
-        // We wrote "inputData" twice, so do two reads and ensure the correct
-        // data comes back.
-        for (int i = 0; i < 2; i++) {
-          byte[] fileContent = new byte[inputData.length];
-          Assert.assertEquals(inputData.length, is.read(fileContent));
-          Assert.assertEquals(new String(inputData, UTF_8),
-              new String(fileContent, UTF_8));
-        }
-      }
-    } finally {
-      cluster.restartHddsDatanode(nodeToKill, true);
-    }
-  }
-
-  private byte[] getInputBytes(int numChunks) {
-    return getInputBytes(0, numChunks, numChunks);
-  }
-
-  private byte[] getInputBytes(int offset, int bufferChunks, int numChunks) {
-    byte[] inputData = new byte[offset + bufferChunks * chunkSize];
-    for (int i = 0; i < numChunks; i++) {
-      int start = offset + (i * chunkSize);
-      Arrays.fill(inputData, start, start + chunkSize - 1,
-          String.valueOf(i % 9).getBytes(UTF_8)[0]);
-    }
-    return inputData;
-  }
-
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
new file mode 100644
index 0000000000..b9baeb2437
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.junit.BeforeClass;
+
+/**
+ * Tests key output stream with zero-copy enabled.
+ */
+public class TestECKeyOutputStreamWithZeroCopy extends
+    AbstractTestECKeyOutputStream {
+  @BeforeClass
+  public static void init() throws Exception {
+    init(true);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to