This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 58d144369c HDDS-10240. Cleanup zero-copy EC (#7340)
58d144369c is described below

commit 58d144369cd8955cd3214293c43d55411fbbace7
Author: Scolley <[email protected]>
AuthorDate: Mon Nov 4 02:06:32 2024 +0800

    HDDS-10240. Cleanup zero-copy EC (#7340)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   3 -
 .../common/src/main/resources/ozone-default.xml    |   8 -
 .../transport/server/GrpcXceiverService.java       |  19 +-
 .../common/transport/server/XceiverServerGrpc.java |   9 +-
 .../replication/GrpcReplicationService.java        |  35 +-
 .../container/replication/ReplicationServer.java   |  29 +-
 .../replication/SendContainerRequestHandler.java   |   6 +-
 .../replication/TestGrpcReplicationService.java    |   8 +-
 .../TestGrpcReplicationServiceWithZeroCopy.java    |  31 --
 .../client/rpc/AbstractTestECKeyOutputStream.java  | 492 ---------------------
 .../ozone/client/rpc/TestECKeyOutputStream.java    | 466 ++++++++++++++++++-
 .../rpc/TestECKeyOutputStreamWithZeroCopy.java     |  31 --
 12 files changed, 479 insertions(+), 658 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index df0fdc59a4..5719803b94 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -201,9 +201,6 @@ public final class OzoneConfigKeys {
       "ozone.client.ec.grpc.write.timeout";
   public static final String OZONE_CLIENT_EC_GRPC_WRITE_TIMEOUT_DEFAULT = 
"30s";
 
-  public static final String OZONE_EC_GRPC_ZERO_COPY_ENABLED =
-      "ozone.ec.grpc.zerocopy.enabled";
-  public static final boolean OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT = true;
 
   /**
    * Ozone administrator users delimited by comma.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index e2231a5c38..bc90a87b11 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4536,14 +4536,6 @@
       to existing buckets till this operation is completed.
     </description>
   </property>
-  <property>
-    <name>ozone.ec.grpc.zerocopy.enabled</name>
-    <value>true</value>
-    <tag>OZONE, DATANODE</tag>
-    <description>
-      Specify if zero-copy should be enabled for EC GRPC protocol.
-    </description>
-  </property>
   <property>
     <name>ozone.om.max.buckets</name>
     <value>100000</value>
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
index 9c3f29d0f0..5f1914402d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -20,7 +20,6 @@ package 
org.apache.hadoop.ozone.container.common.transport.server;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
-import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
 import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
@@ -31,7 +30,6 @@ import 
org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.InputStream;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.getSendMethod;
@@ -45,28 +43,20 @@ public class GrpcXceiverService extends
       LOG = LoggerFactory.getLogger(GrpcXceiverService.class);
 
   private final ContainerDispatcher dispatcher;
-  private final boolean zeroCopyEnabled;
   private final ZeroCopyMessageMarshaller<ContainerCommandRequestProto>
       zeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
           ContainerCommandRequestProto.getDefaultInstance());
 
-  public GrpcXceiverService(ContainerDispatcher dispatcher,
-      boolean zeroCopyEnabled) {
+  public GrpcXceiverService(ContainerDispatcher dispatcher) {
     this.dispatcher = dispatcher;
-    this.zeroCopyEnabled = zeroCopyEnabled;
   }
 
   /**
-   * Bind service with zerocopy marshaller equipped for the `send` API if
-   * zerocopy is enabled.
+   * Bind service with zerocopy marshaller equipped for the `send` API.
    * @return  service definition.
    */
   public ServerServiceDefinition bindServiceWithZeroCopy() {
     ServerServiceDefinition orig = super.bindService();
-    if (!zeroCopyEnabled) {
-      LOG.info("Zerocopy is not enabled.");
-      return orig;
-    }
 
     ServerServiceDefinition.Builder builder =
         ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());
@@ -117,10 +107,7 @@ public class GrpcXceiverService extends
           isClosed.set(true);
           responseObserver.onError(e);
         } finally {
-          InputStream popStream = zeroCopyMessageMarshaller.popStream(request);
-          if (popStream != null) {
-            IOUtils.close(LOG, popStream);
-          }
+          zeroCopyMessageMarshaller.release(request);
         }
       }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 42daaa94be..624f153e87 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -67,9 +67,6 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT;
-
 /**
  * Creates a Grpc server endpoint that acts as the communication layer for
  * Ozone containers.
@@ -135,13 +132,9 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
       eventLoopGroup = new NioEventLoopGroup(poolSize / 10, factory);
       channelType = NioServerSocketChannel.class;
     }
-    final boolean zeroCopyEnabled = conf.getBoolean(
-        OZONE_EC_GRPC_ZERO_COPY_ENABLED,
-        OZONE_EC_GRPC_ZERO_COPY_ENABLED_DEFAULT);
 
     LOG.info("GrpcServer channel type {}", channelType.getSimpleName());
-    GrpcXceiverService xceiverService = new GrpcXceiverService(dispatcher,
-        zeroCopyEnabled);
+    GrpcXceiverService xceiverService = new GrpcXceiverService(dispatcher);
     NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
         .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
         .bossEventLoopGroup(eventLoopGroup)
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index 6bc237207b..26cd0d82a9 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.HashSet;
 import java.util.Set;
@@ -59,37 +58,24 @@ public class GrpcReplicationService extends
   private final ContainerReplicationSource source;
   private final ContainerImporter importer;
 
-  private final boolean zeroCopyEnabled;
-
   private final ZeroCopyMessageMarshaller<SendContainerRequest>
       sendContainerZeroCopyMessageMarshaller;
 
   private final ZeroCopyMessageMarshaller<CopyContainerRequestProto>
       copyContainerZeroCopyMessageMarshaller;
 
-  public GrpcReplicationService(ContainerReplicationSource source,
-      ContainerImporter importer, boolean zeroCopyEnabled) {
+  public GrpcReplicationService(ContainerReplicationSource source, 
ContainerImporter importer) {
     this.source = source;
     this.importer = importer;
-    this.zeroCopyEnabled = zeroCopyEnabled;
-
-    if (zeroCopyEnabled) {
-      sendContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
-          SendContainerRequest.getDefaultInstance());
-      copyContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
-          CopyContainerRequestProto.getDefaultInstance());
-    } else {
-      sendContainerZeroCopyMessageMarshaller = null;
-      copyContainerZeroCopyMessageMarshaller = null;
-    }
+
+    sendContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
+            SendContainerRequest.getDefaultInstance());
+    copyContainerZeroCopyMessageMarshaller = new ZeroCopyMessageMarshaller<>(
+            CopyContainerRequestProto.getDefaultInstance());
   }
 
   public ServerServiceDefinition bindServiceWithZeroCopy() {
     ServerServiceDefinition orig = super.bindService();
-    if (!zeroCopyEnabled) {
-      LOG.info("Zerocopy is not enabled.");
-      return orig;
-    }
 
     Set<String> methodNames = new HashSet<>();
     ServerServiceDefinition.Builder builder =
@@ -155,14 +141,7 @@ public class GrpcReplicationService extends
     } finally {
       // output may have already been closed, ignore such errors
       IOUtils.cleanupWithLogger(LOG, outputStream);
-
-      if (copyContainerZeroCopyMessageMarshaller != null) {
-        InputStream popStream =
-            copyContainerZeroCopyMessageMarshaller.popStream(request);
-        if (popStream != null) {
-          IOUtils.cleanupWithLogger(LOG, popStream);
-        }
-      }
+      copyContainerZeroCopyMessageMarshaller.release(request);
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index b4e92a4a60..6ca474bdd8 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -99,13 +99,12 @@ public class ReplicationServer {
         new LinkedBlockingQueue<>(replicationQueueLimit),
         threadFactory);
 
-    init(replicationConfig.isZeroCopyEnable());
+    init();
   }
 
-  public void init(boolean enableZeroCopy) {
+  public void init() {
     GrpcReplicationService grpcReplicationService = new GrpcReplicationService(
-        new OnDemandContainerReplicationSource(controller), importer,
-        enableZeroCopy);
+        new OnDemandContainerReplicationSource(controller), importer);
     NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
         .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
         .addService(ServerInterceptors.intercept(
@@ -203,11 +202,6 @@ public class ReplicationServer {
     static final String REPLICATION_OUTOFSERVICE_FACTOR_KEY =
         PREFIX + "." + OUTOFSERVICE_FACTOR_KEY;
 
-    public static final String ZEROCOPY_ENABLE_KEY = "zerocopy.enabled";
-    private static final boolean ZEROCOPY_ENABLE_DEFAULT = true;
-    private static final String ZEROCOPY_ENABLE_DEFAULT_VALUE =
-        "true";
-
     /**
      * The maximum number of replication commands a single datanode can execute
      * simultaneously.
@@ -249,15 +243,6 @@ public class ReplicationServer {
     )
     private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT;
 
-    @Config(key = ZEROCOPY_ENABLE_KEY,
-        type = ConfigType.BOOLEAN,
-        defaultValue =  ZEROCOPY_ENABLE_DEFAULT_VALUE,
-        tags = {DATANODE, SCM},
-        description = "Specify if zero-copy should be enabled for " +
-            "replication protocol."
-    )
-    private boolean zeroCopyEnable = ZEROCOPY_ENABLE_DEFAULT;
-
     public double getOutOfServiceFactor() {
       return outOfServiceFactor;
     }
@@ -291,14 +276,6 @@ public class ReplicationServer {
       this.replicationQueueLimit = limit;
     }
 
-    public boolean isZeroCopyEnable() {
-      return zeroCopyEnable;
-    }
-
-    public void setZeroCopyEnable(boolean zeroCopyEnable) {
-      this.zeroCopyEnable = zeroCopyEnable;
-    }
-
     @PostConstruct
     public void validate() {
       if (replicationMaxStreams < 1) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
index 506a96fe05..40b4dec349 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -105,10 +104,7 @@ class SendContainerRequestHandler
       onError(t);
     } finally {
       if (marshaller != null) {
-        InputStream popStream = marshaller.popStream(req);
-        if (popStream != null) {
-          IOUtils.cleanupWithLogger(LOG, popStream);
-        }
+        marshaller.release(req);
       }
     }
   }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
index 03901b99be..bca98d1e7d 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
@@ -83,17 +83,15 @@ class TestGrpcReplicationService {
 
   @BeforeEach
   public void setUp() throws Exception {
-    init(false);
+    init();
   }
 
-  public void init(boolean isZeroCopy) throws Exception {
+  public void init() throws Exception {
     conf = new OzoneConfiguration();
 
     ReplicationServer.ReplicationConfig replicationConfig =
         conf.getObject(ReplicationServer.ReplicationConfig.class);
 
-    replicationConfig.setZeroCopyEnable(isZeroCopy);
-
     SecurityConfig secConf = new SecurityConfig(conf);
 
     ContainerSet containerSet = new ContainerSet(1000);
@@ -230,7 +228,7 @@ class TestGrpcReplicationService {
     };
     ContainerImporter importer = mock(ContainerImporter.class);
     GrpcReplicationService subject =
-        new GrpcReplicationService(source, importer, false);
+        new GrpcReplicationService(source, importer);
 
     CopyContainerRequestProto request = CopyContainerRequestProto.newBuilder()
         .setContainerID(1)
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationServiceWithZeroCopy.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationServiceWithZeroCopy.java
deleted file mode 100644
index 00891cf3e2..0000000000
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationServiceWithZeroCopy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import org.junit.jupiter.api.BeforeEach;
-
-/**
- * Tests {@link GrpcReplicationService}.
- */
-class TestGrpcReplicationServiceWithZeroCopy
-    extends TestGrpcReplicationService {
-  @BeforeEach
-  public void setUp() throws Exception {
-    init(true);
-  }
-}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
deleted file mode 100644
index 3063e2587e..0000000000
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/AbstractTestECKeyOutputStream.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.client.rpc;
-
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.utils.IOUtils;
-import org.apache.hadoop.ozone.ClientConfigForTesting;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.BucketArgs;
-import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneKey;
-import org.apache.hadoop.ozone.client.OzoneKeyDetails;
-import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
-import org.apache.hadoop.ozone.client.io.KeyOutputStream;
-import org.apache.hadoop.ozone.client.io.OzoneInputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.TestHelper;
-import org.apache.ozone.test.GenericTestUtils;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.fail;
-
-/**
- * Tests key output stream.
- */
-abstract class AbstractTestECKeyOutputStream {
-  private static MiniOzoneCluster cluster;
-  private static OzoneConfiguration conf = new OzoneConfiguration();
-  private static OzoneClient client;
-  private static ObjectStore objectStore;
-  private static int chunkSize;
-  private static int flushSize;
-  private static int maxFlushSize;
-  private static int blockSize;
-  private static String volumeName;
-  private static String bucketName;
-  private static String keyString;
-  private static int dataBlocks = 3;
-  private static int inputSize = dataBlocks * chunkSize;
-  private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   */
-  protected static void init(boolean zeroCopyEnabled) throws Exception {
-    chunkSize = 1024 * 1024;
-    flushSize = 2 * chunkSize;
-    maxFlushSize = 2 * flushSize;
-    blockSize = 2 * maxFlushSize;
-
-    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
-    clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
-    clientConfig.setStreamBufferFlushDelay(false);
-    conf.setFromObject(clientConfig);
-
-    // If SCM detects dead node too quickly, then container would be moved to
-    // closed state and all in progress writes will get exception. To avoid
-    // that, we are just keeping higher timeout and none of the tests depending
-    // on deadnode detection timeout currently.
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
-    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
-    conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
-        TimeUnit.SECONDS);
-    conf.setTimeDuration(
-        "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
-        TimeUnit.SECONDS);
-    conf.setQuietMode(false);
-    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
-        StorageUnit.MB);
-    conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
-        TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
-        TimeUnit.SECONDS);
-    conf.setBoolean(OzoneConfigKeys.OZONE_EC_GRPC_ZERO_COPY_ENABLED,
-        zeroCopyEnabled);
-    conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
-    // "Enable" hsync to verify that hsync would be blocked by 
ECKeyOutputStream
-    conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true);
-    conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);
-    conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
-
-    ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
-        .setBlockSize(blockSize)
-        .setChunkSize(chunkSize)
-        .setStreamBufferFlushSize(flushSize)
-        .setStreamBufferMaxSize(maxFlushSize)
-        .applyTo(conf);
-
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(10)
-        .build();
-    cluster.waitForClusterToBeReady();
-    client = OzoneClientFactory.getRpcClient(conf);
-    objectStore = client.getObjectStore();
-    keyString = UUID.randomUUID().toString();
-    volumeName = "testeckeyoutputstream";
-    bucketName = volumeName;
-    objectStore.createVolume(volumeName);
-    objectStore.getVolume(volumeName).createBucket(bucketName);
-    initInputChunks();
-  }
-
-  @BeforeAll
-  public static void init() throws Exception {
-    init(false);
-  }
-
-  /**
-   * Shutdown MiniDFSCluster.
-   */
-  @AfterAll
-  public static void shutdown() {
-    IOUtils.closeQuietly(client);
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testCreateKeyWithECReplicationConfig() throws Exception {
-    try (OzoneOutputStream key = TestHelper
-        .createKey(keyString, new ECReplicationConfig(3, 2,
-                ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
-            objectStore, volumeName, bucketName)) {
-      assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream());
-    }
-  }
-
-  @Test
-  public void testCreateKeyWithOutBucketDefaults() throws Exception {
-    OzoneVolume volume = objectStore.getVolume(volumeName);
-    OzoneBucket bucket = volume.getBucket(bucketName);
-    try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
-      assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
-      for (byte[] inputChunk : inputChunks) {
-        out.write(inputChunk);
-      }
-    }
-  }
-
-  @Test
-  public void testCreateKeyWithBucketDefaults() throws Exception {
-    String myBucket = UUID.randomUUID().toString();
-    OzoneVolume volume = objectStore.getVolume(volumeName);
-    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
-    bucketArgs.setDefaultReplicationConfig(
-        new DefaultReplicationConfig(
-            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-                chunkSize)));
-
-    volume.createBucket(myBucket, bucketArgs.build());
-    OzoneBucket bucket = volume.getBucket(myBucket);
-
-    try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
-      assertInstanceOf(ECKeyOutputStream.class, out.getOutputStream());
-      for (byte[] inputChunk : inputChunks) {
-        out.write(inputChunk);
-      }
-    }
-    byte[] buf = new byte[chunkSize];
-    try (OzoneInputStream in = bucket.readKey(keyString)) {
-      for (byte[] inputChunk : inputChunks) {
-        int read = in.read(buf, 0, chunkSize);
-        assertEquals(chunkSize, read);
-        assertArrayEquals(buf, inputChunk);
-      }
-    }
-  }
-
-  @Test
-  public void testOverwriteECKeyWithRatisKey() throws Exception {
-    String myBucket = UUID.randomUUID().toString();
-    OzoneVolume volume = objectStore.getVolume(volumeName);
-    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
-    volume.createBucket(myBucket, bucketArgs.build());
-    OzoneBucket bucket = volume.getBucket(myBucket);
-    createKeyAndCheckReplicationConfig(keyString, bucket,
-        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-            chunkSize));
-
-    //Overwrite with RATIS/THREE
-    createKeyAndCheckReplicationConfig(keyString, bucket,
-        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
-
-    //Overwrite with RATIS/ONE
-    createKeyAndCheckReplicationConfig(keyString, bucket,
-        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE));
-  }
-
-  @Test
-  public void testOverwriteRatisKeyWithECKey() throws Exception {
-    String myBucket = UUID.randomUUID().toString();
-    OzoneVolume volume = objectStore.getVolume(volumeName);
-    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
-    volume.createBucket(myBucket, bucketArgs.build());
-    OzoneBucket bucket = volume.getBucket(myBucket);
-
-    createKeyAndCheckReplicationConfig(keyString, bucket,
-        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
-    // Overwrite with EC key
-    createKeyAndCheckReplicationConfig(keyString, bucket,
-        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-            chunkSize));
-  }
-
-  private void createKeyAndCheckReplicationConfig(String keyName,
-                                                  OzoneBucket bucket, 
ReplicationConfig replicationConfig)
-      throws IOException {
-    try (OzoneOutputStream out = bucket
-        .createKey(keyName, inputSize, replicationConfig, new HashMap<>())) {
-      for (byte[] inputChunk : inputChunks) {
-        out.write(inputChunk);
-      }
-    }
-    OzoneKeyDetails key = bucket.getKey(keyName);
-    assertEquals(replicationConfig, key.getReplicationConfig());
-  }
-
-  @Test
-  public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
-    OzoneBucket bucket = getOzoneBucket();
-    try (OzoneOutputStream out = bucket.createKey(
-        "testCreateRatisKeyAndWithECBucketDefaults", 2000,
-        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
-        new HashMap<>())) {
-      assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
-      for (byte[] inputChunk : inputChunks) {
-        out.write(inputChunk);
-      }
-    }
-  }
-
-  @Test
-  public void test13ChunksInSingleWriteOp() throws IOException {
-    testMultipleChunksInSingleWriteOp(13);
-  }
-
-  @Test
-  public void testChunksInSingleWriteOpWithOffset() throws IOException {
-    testMultipleChunksInSingleWriteOp(11, 25, 19);
-  }
-
-  @Test
-  public void test15ChunksInSingleWriteOp() throws IOException {
-    testMultipleChunksInSingleWriteOp(15);
-  }
-
-  @Test
-  public void test20ChunksInSingleWriteOp() throws IOException {
-    testMultipleChunksInSingleWriteOp(20);
-  }
-
-  @Test
-  public void test21ChunksInSingleWriteOp() throws IOException {
-    testMultipleChunksInSingleWriteOp(21);
-  }
-
-  private void testMultipleChunksInSingleWriteOp(int offset,
-                                                 int bufferChunks, int 
numChunks)
-      throws IOException {
-    byte[] inputData = getInputBytes(offset, bufferChunks, numChunks);
-    final OzoneBucket bucket = getOzoneBucket();
-    String keyName =
-        String.format("testMultipleChunksInSingleWriteOpOffset" +
-                "%dBufferChunks%dNumChunks", offset, bufferChunks,
-            numChunks);
-    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
-        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-            chunkSize), new HashMap<>())) {
-      out.write(inputData, offset, numChunks * chunkSize);
-    }
-
-    validateContent(offset, numChunks * chunkSize, inputData, bucket,
-        bucket.getKey(keyName));
-  }
-
-  private void testMultipleChunksInSingleWriteOp(int numChunks)
-      throws IOException {
-    testMultipleChunksInSingleWriteOp(0, numChunks, numChunks);
-  }
-
-  @Test
-  public void testECContainerKeysCountAndNumContainerReplicas()
-      throws IOException, InterruptedException, TimeoutException {
-    byte[] inputData = getInputBytes(1);
-    final OzoneBucket bucket = getOzoneBucket();
-    ContainerOperationClient containerOperationClient =
-        new ContainerOperationClient(conf);
-
-    ECReplicationConfig repConfig = new ECReplicationConfig(
-        3, 2, ECReplicationConfig.EcCodec.RS, chunkSize);
-    // Close all EC pipelines so we must get a fresh pipeline and hence
-    // container for this test.
-    PipelineManager pm =
-        cluster.getStorageContainerManager().getPipelineManager();
-    for (Pipeline p : pm.getPipelines(repConfig)) {
-      pm.closePipeline(p, true);
-    }
-
-    String keyName = UUID.randomUUID().toString();
-    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
-        repConfig, new HashMap<>())) {
-      out.write(inputData);
-    }
-    OzoneKeyDetails key = bucket.getKey(keyName);
-    long currentKeyContainerID =
-        key.getOzoneKeyLocations().get(0).getContainerID();
-
-    GenericTestUtils.waitFor(() -> {
-      try {
-        return (containerOperationClient.getContainer(currentKeyContainerID)
-            .getNumberOfKeys() == 1) && (containerOperationClient
-            .getContainerReplicas(currentKeyContainerID).size() == 5);
-      } catch (IOException exception) {
-        fail("Unexpected exception " + exception);
-        return false;
-      }
-    }, 100, 10000);
-    validateContent(inputData, bucket, key);
-  }
-
-  private void validateContent(byte[] inputData, OzoneBucket bucket,
-                               OzoneKey key) throws IOException {
-    validateContent(0, inputData.length, inputData, bucket, key);
-  }
-
-  private void validateContent(int offset, int length, byte[] inputData,
-                               OzoneBucket bucket,
-                               OzoneKey key) throws IOException {
-    try (OzoneInputStream is = bucket.readKey(key.getName())) {
-      byte[] fileContent = new byte[length];
-      assertEquals(length, is.read(fileContent));
-      assertEquals(new String(Arrays.copyOfRange(inputData, offset,
-              offset + length), UTF_8),
-          new String(fileContent, UTF_8));
-    }
-  }
-
-  private OzoneBucket getOzoneBucket() throws IOException {
-    String myBucket = UUID.randomUUID().toString();
-    OzoneVolume volume = objectStore.getVolume(volumeName);
-    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
-    bucketArgs.setDefaultReplicationConfig(
-        new DefaultReplicationConfig(
-            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-                chunkSize)));
-
-    volume.createBucket(myBucket, bucketArgs.build());
-    return volume.getBucket(myBucket);
-  }
-
-  private static void initInputChunks() {
-    for (int i = 0; i < dataBlocks; i++) {
-      inputChunks[i] = getBytesWith(i + 1, chunkSize);
-    }
-  }
-
-  private static byte[] getBytesWith(int singleDigitNumber, int total) {
-    StringBuilder builder = new StringBuilder(singleDigitNumber);
-    for (int i = 1; i <= total; i++) {
-      builder.append(singleDigitNumber);
-    }
-    return builder.toString().getBytes(UTF_8);
-  }
-
-  @Test
-  public void testWriteShouldSucceedWhenDNKilled() throws Exception {
-    int numChunks = 3;
-    byte[] inputData = getInputBytes(numChunks);
-    final OzoneBucket bucket = getOzoneBucket();
-    String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
-    DatanodeDetails nodeToKill = null;
-    try {
-      try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
-          new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-              chunkSize), new HashMap<>())) {
-        ECKeyOutputStream ecOut = (ECKeyOutputStream) out.getOutputStream();
-        out.write(inputData);
-        // Kill a node from first pipeline
-        nodeToKill = ecOut.getStreamEntries()
-            .get(0).getPipeline().getFirstNode();
-        cluster.shutdownHddsDatanode(nodeToKill);
-
-        out.write(inputData);
-
-        // Wait for flushing thread to finish its work.
-        final long checkpoint = System.currentTimeMillis();
-        ecOut.insertFlushCheckpoint(checkpoint);
-        GenericTestUtils.waitFor(() -> ecOut.getFlushCheckpoint() == 
checkpoint,
-            100, 10000);
-
-        // Check the second blockGroup pipeline to make sure that the failed
-        // node is not selected.
-        assertThat(ecOut.getStreamEntries().get(1).getPipeline().getNodes())
-            .doesNotContain(nodeToKill);
-      }
-
-      try (OzoneInputStream is = bucket.readKey(keyName)) {
-        // We wrote "inputData" twice, so do two reads and ensure the correct
-        // data comes back.
-        for (int i = 0; i < 2; i++) {
-          byte[] fileContent = new byte[inputData.length];
-          assertEquals(inputData.length, is.read(fileContent));
-          assertEquals(new String(inputData, UTF_8),
-              new String(fileContent, UTF_8));
-        }
-      }
-    } finally {
-      cluster.restartHddsDatanode(nodeToKill, true);
-    }
-  }
-
-  private byte[] getInputBytes(int numChunks) {
-    return getInputBytes(0, numChunks, numChunks);
-  }
-
-  private byte[] getInputBytes(int offset, int bufferChunks, int numChunks) {
-    byte[] inputData = new byte[offset + bufferChunks * chunkSize];
-    for (int i = 0; i < numChunks; i++) {
-      int start = offset + (i * chunkSize);
-      Arrays.fill(inputData, start, start + chunkSize - 1,
-          String.valueOf(i % 9).getBytes(UTF_8)[0]);
-    }
-    return inputData;
-  }
-
-  @Test
-  public void testBlockedHflushAndHsync() throws Exception {
-    // Expect ECKeyOutputStream hflush and hsync calls to throw exception
-    try (OzoneOutputStream oOut = TestHelper.createKey(
-        keyString, new ECReplicationConfig(3, 2, 
ECReplicationConfig.EcCodec.RS, chunkSize),
-        inputSize, objectStore, volumeName, bucketName)) {
-      assertInstanceOf(ECKeyOutputStream.class, oOut.getOutputStream());
-      KeyOutputStream kOut = (KeyOutputStream) oOut.getOutputStream();
-
-      assertThrows(NotImplementedException.class, () -> kOut.hflush());
-      assertThrows(NotImplementedException.class, () -> kOut.hsync());
-    }
-  }
-
-}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index c5147ecfb0..5743866f2d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -17,15 +17,471 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.ClientConfigForTesting;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
- * Tests key output stream without zero-copy enabled.
+ * Tests key output stream.
  */
-public class TestECKeyOutputStream extends
-    AbstractTestECKeyOutputStream {
+public class TestECKeyOutputStream {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf = new OzoneConfiguration();
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static int chunkSize;
+  private static int flushSize;
+  private static int maxFlushSize;
+  private static int blockSize;
+  private static String volumeName;
+  private static String bucketName;
+  private static String keyString;
+  private static int dataBlocks = 3;
+  private static int inputSize = dataBlocks * chunkSize;
+  private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   */
   @BeforeAll
-  public static void init() throws Exception {
-    init(false);
+  protected static void init() throws Exception {
+    chunkSize = 1024 * 1024;
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+
+    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+    clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
+    clientConfig.setStreamBufferFlushDelay(false);
+    conf.setFromObject(clientConfig);
+
+    // If SCM detects dead node too quickly, then container would be moved to
+    // closed state and all in progress writes will get exception. To avoid
+    // that, we are just keeping higher timeout and none of the tests depending
+    // on deadnode detection timeout currently.
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
+    conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
+        TimeUnit.SECONDS);
+    conf.setTimeDuration(
+        "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
+        TimeUnit.SECONDS);
+    conf.setQuietMode(false);
+    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+        StorageUnit.MB);
+    conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+        TimeUnit.SECONDS);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
+    // "Enable" hsync to verify that hsync would be blocked by 
ECKeyOutputStream
+    conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true);
+    conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);
+    conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
+
+    ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .applyTo(conf);
+
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(10)
+        .build();
+    cluster.waitForClusterToBeReady();
+    client = OzoneClientFactory.getRpcClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "testeckeyoutputstream";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+    initInputChunks();
   }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterAll
+  public static void shutdown() {
+    IOUtils.closeQuietly(client);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testCreateKeyWithECReplicationConfig() throws Exception {
+    try (OzoneOutputStream key = TestHelper
+        .createKey(keyString, new ECReplicationConfig(3, 2,
+                ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
+            objectStore, volumeName, bucketName)) {
+      assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream());
+    }
+  }
+
+  @Test
+  public void testCreateKeyWithOutBucketDefaults() throws Exception {
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
+      assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
+      for (byte[] inputChunk : inputChunks) {
+        out.write(inputChunk);
+      }
+    }
+  }
+
+  @Test
+  public void testCreateKeyWithBucketDefaults() throws Exception {
+    String myBucket = UUID.randomUUID().toString();
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+    bucketArgs.setDefaultReplicationConfig(
+        new DefaultReplicationConfig(
+            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+                chunkSize)));
+
+    volume.createBucket(myBucket, bucketArgs.build());
+    OzoneBucket bucket = volume.getBucket(myBucket);
+
+    try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
+      assertInstanceOf(ECKeyOutputStream.class, out.getOutputStream());
+      for (byte[] inputChunk : inputChunks) {
+        out.write(inputChunk);
+      }
+    }
+    byte[] buf = new byte[chunkSize];
+    try (OzoneInputStream in = bucket.readKey(keyString)) {
+      for (byte[] inputChunk : inputChunks) {
+        int read = in.read(buf, 0, chunkSize);
+        assertEquals(chunkSize, read);
+        assertArrayEquals(buf, inputChunk);
+      }
+    }
+  }
+
+  @Test
+  public void testOverwriteECKeyWithRatisKey() throws Exception {
+    String myBucket = UUID.randomUUID().toString();
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+    volume.createBucket(myBucket, bucketArgs.build());
+    OzoneBucket bucket = volume.getBucket(myBucket);
+    createKeyAndCheckReplicationConfig(keyString, bucket,
+        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+            chunkSize));
+
+    //Overwrite with RATIS/THREE
+    createKeyAndCheckReplicationConfig(keyString, bucket,
+        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+
+    //Overwrite with RATIS/ONE
+    createKeyAndCheckReplicationConfig(keyString, bucket,
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE));
+  }
+
+  @Test
+  public void testOverwriteRatisKeyWithECKey() throws Exception {
+    String myBucket = UUID.randomUUID().toString();
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+    volume.createBucket(myBucket, bucketArgs.build());
+    OzoneBucket bucket = volume.getBucket(myBucket);
+
+    createKeyAndCheckReplicationConfig(keyString, bucket,
+        
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+    // Overwrite with EC key
+    createKeyAndCheckReplicationConfig(keyString, bucket,
+        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+            chunkSize));
+  }
+
+  private void createKeyAndCheckReplicationConfig(String keyName,
+                                                  OzoneBucket bucket, 
ReplicationConfig replicationConfig)
+      throws IOException {
+    try (OzoneOutputStream out = bucket
+        .createKey(keyName, inputSize, replicationConfig, new HashMap<>())) {
+      for (byte[] inputChunk : inputChunks) {
+        out.write(inputChunk);
+      }
+    }
+    OzoneKeyDetails key = bucket.getKey(keyName);
+    assertEquals(replicationConfig, key.getReplicationConfig());
+  }
+
+  @Test
+  public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
+    OzoneBucket bucket = getOzoneBucket();
+    try (OzoneOutputStream out = bucket.createKey(
+        "testCreateRatisKeyAndWithECBucketDefaults", 2000,
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
+        new HashMap<>())) {
+      assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
+      for (byte[] inputChunk : inputChunks) {
+        out.write(inputChunk);
+      }
+    }
+  }
+
+  @Test
+  public void test13ChunksInSingleWriteOp() throws IOException {
+    testMultipleChunksInSingleWriteOp(13);
+  }
+
+  @Test
+  public void testChunksInSingleWriteOpWithOffset() throws IOException {
+    testMultipleChunksInSingleWriteOp(11, 25, 19);
+  }
+
+  @Test
+  public void test15ChunksInSingleWriteOp() throws IOException {
+    testMultipleChunksInSingleWriteOp(15);
+  }
+
+  @Test
+  public void test20ChunksInSingleWriteOp() throws IOException {
+    testMultipleChunksInSingleWriteOp(20);
+  }
+
+  @Test
+  public void test21ChunksInSingleWriteOp() throws IOException {
+    testMultipleChunksInSingleWriteOp(21);
+  }
+
+  private void testMultipleChunksInSingleWriteOp(int offset,
+                                                 int bufferChunks, int 
numChunks)
+      throws IOException {
+    byte[] inputData = getInputBytes(offset, bufferChunks, numChunks);
+    final OzoneBucket bucket = getOzoneBucket();
+    String keyName =
+        String.format("testMultipleChunksInSingleWriteOpOffset" +
+                "%dBufferChunks%dNumChunks", offset, bufferChunks,
+            numChunks);
+    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
+        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+            chunkSize), new HashMap<>())) {
+      out.write(inputData, offset, numChunks * chunkSize);
+    }
+
+    validateContent(offset, numChunks * chunkSize, inputData, bucket,
+        bucket.getKey(keyName));
+  }
+
+  private void testMultipleChunksInSingleWriteOp(int numChunks)
+      throws IOException {
+    testMultipleChunksInSingleWriteOp(0, numChunks, numChunks);
+  }
+
+  @Test
+  public void testECContainerKeysCountAndNumContainerReplicas()
+      throws IOException, InterruptedException, TimeoutException {
+    byte[] inputData = getInputBytes(1);
+    final OzoneBucket bucket = getOzoneBucket();
+    ContainerOperationClient containerOperationClient =
+        new ContainerOperationClient(conf);
+
+    ECReplicationConfig repConfig = new ECReplicationConfig(
+        3, 2, ECReplicationConfig.EcCodec.RS, chunkSize);
+    // Close all EC pipelines so we must get a fresh pipeline and hence
+    // container for this test.
+    PipelineManager pm =
+        cluster.getStorageContainerManager().getPipelineManager();
+    for (Pipeline p : pm.getPipelines(repConfig)) {
+      pm.closePipeline(p, true);
+    }
+
+    String keyName = UUID.randomUUID().toString();
+    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
+        repConfig, new HashMap<>())) {
+      out.write(inputData);
+    }
+    OzoneKeyDetails key = bucket.getKey(keyName);
+    long currentKeyContainerID =
+        key.getOzoneKeyLocations().get(0).getContainerID();
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return (containerOperationClient.getContainer(currentKeyContainerID)
+            .getNumberOfKeys() == 1) && (containerOperationClient
+            .getContainerReplicas(currentKeyContainerID).size() == 5);
+      } catch (IOException exception) {
+        fail("Unexpected exception " + exception);
+        return false;
+      }
+    }, 100, 10000);
+    validateContent(inputData, bucket, key);
+  }
+
+  private void validateContent(byte[] inputData, OzoneBucket bucket,
+                               OzoneKey key) throws IOException {
+    validateContent(0, inputData.length, inputData, bucket, key);
+  }
+
+  private void validateContent(int offset, int length, byte[] inputData,
+                               OzoneBucket bucket,
+                               OzoneKey key) throws IOException {
+    try (OzoneInputStream is = bucket.readKey(key.getName())) {
+      byte[] fileContent = new byte[length];
+      assertEquals(length, is.read(fileContent));
+      assertEquals(new String(Arrays.copyOfRange(inputData, offset,
+              offset + length), UTF_8),
+          new String(fileContent, UTF_8));
+    }
+  }
+
+  private OzoneBucket getOzoneBucket() throws IOException {
+    String myBucket = UUID.randomUUID().toString();
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+    bucketArgs.setDefaultReplicationConfig(
+        new DefaultReplicationConfig(
+            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+                chunkSize)));
+
+    volume.createBucket(myBucket, bucketArgs.build());
+    return volume.getBucket(myBucket);
+  }
+
+  private static void initInputChunks() {
+    for (int i = 0; i < dataBlocks; i++) {
+      inputChunks[i] = getBytesWith(i + 1, chunkSize);
+    }
+  }
+
+  private static byte[] getBytesWith(int singleDigitNumber, int total) {
+    StringBuilder builder = new StringBuilder(singleDigitNumber);
+    for (int i = 1; i <= total; i++) {
+      builder.append(singleDigitNumber);
+    }
+    return builder.toString().getBytes(UTF_8);
+  }
+
+  @Test
+  public void testWriteShouldSucceedWhenDNKilled() throws Exception {
+    int numChunks = 3;
+    byte[] inputData = getInputBytes(numChunks);
+    final OzoneBucket bucket = getOzoneBucket();
+    String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
+    DatanodeDetails nodeToKill = null;
+    try {
+      try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
+          new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+              chunkSize), new HashMap<>())) {
+        ECKeyOutputStream ecOut = (ECKeyOutputStream) out.getOutputStream();
+        out.write(inputData);
+        // Kill a node from first pipeline
+        nodeToKill = ecOut.getStreamEntries()
+            .get(0).getPipeline().getFirstNode();
+        cluster.shutdownHddsDatanode(nodeToKill);
+
+        out.write(inputData);
+
+        // Wait for flushing thread to finish its work.
+        final long checkpoint = System.currentTimeMillis();
+        ecOut.insertFlushCheckpoint(checkpoint);
+        GenericTestUtils.waitFor(() -> ecOut.getFlushCheckpoint() == 
checkpoint,
+            100, 10000);
+
+        // Check the second blockGroup pipeline to make sure that the failed
+        // node is not selected.
+        assertThat(ecOut.getStreamEntries().get(1).getPipeline().getNodes())
+            .doesNotContain(nodeToKill);
+      }
+
+      try (OzoneInputStream is = bucket.readKey(keyName)) {
+        // We wrote "inputData" twice, so do two reads and ensure the correct
+        // data comes back.
+        for (int i = 0; i < 2; i++) {
+          byte[] fileContent = new byte[inputData.length];
+          assertEquals(inputData.length, is.read(fileContent));
+          assertEquals(new String(inputData, UTF_8),
+              new String(fileContent, UTF_8));
+        }
+      }
+    } finally {
+      cluster.restartHddsDatanode(nodeToKill, true);
+    }
+  }
+
+  private byte[] getInputBytes(int numChunks) {
+    return getInputBytes(0, numChunks, numChunks);
+  }
+
+  private byte[] getInputBytes(int offset, int bufferChunks, int numChunks) {
+    byte[] inputData = new byte[offset + bufferChunks * chunkSize];
+    for (int i = 0; i < numChunks; i++) {
+      int start = offset + (i * chunkSize);
+      Arrays.fill(inputData, start, start + chunkSize - 1,
+          String.valueOf(i % 9).getBytes(UTF_8)[0]);
+    }
+    return inputData;
+  }
+
+  @Test
+  public void testBlockedHflushAndHsync() throws Exception {
+    // Expect ECKeyOutputStream hflush and hsync calls to throw exception
+    try (OzoneOutputStream oOut = TestHelper.createKey(
+        keyString, new ECReplicationConfig(3, 2, 
ECReplicationConfig.EcCodec.RS, chunkSize),
+        inputSize, objectStore, volumeName, bucketName)) {
+      assertInstanceOf(ECKeyOutputStream.class, oOut.getOutputStream());
+      KeyOutputStream kOut = (KeyOutputStream) oOut.getOutputStream();
+
+      assertThrows(NotImplementedException.class, () -> kOut.hflush());
+      assertThrows(NotImplementedException.class, () -> kOut.hsync());
+    }
+  }
+
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
deleted file mode 100644
index 47c94e03cb..0000000000
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStreamWithZeroCopy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.client.rpc;
-
-import org.junit.jupiter.api.BeforeAll;
-
-/**
- * Tests key output stream with zero-copy enabled.
- */
-public class TestECKeyOutputStreamWithZeroCopy extends
-    AbstractTestECKeyOutputStream {
-  @BeforeAll
-  public static void init() throws Exception {
-    init(true);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to