This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 4115827  HDDS-4940 : EC: Implement the ECKeyOutputStream which should 
handle the EC mode writes. (#2335)
4115827 is described below

commit 4115827ff9cb90bfc1ec6f4863a0a800d80b82c0
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Mon Jul 12 09:18:20 2021 -0700

    HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC 
mode writes. (#2335)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   3 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  28 +-
 .../hdds/scm/storage/ECBlockOutputStream.java      | 118 ++++
 hadoop-ozone/client/pom.xml                        |   4 +
 .../ozone/client/io/BlockOutputStreamEntry.java    |  16 +-
 .../client/io/BlockOutputStreamEntryPool.java      |  39 +-
 .../ozone/client/io/ECBlockOutputStreamEntry.java  | 126 ++++
 .../client/io/ECBlockOutputStreamEntryPool.java    | 149 +++++
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 648 +++++++++++++++++++++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  30 +-
 .../hadoop/ozone/client/MockDatanodeStorage.java   |   7 +-
 .../hadoop/ozone/client/MockOmTransport.java       |   4 +
 .../ozone/client/MockXceiverClientFactory.java     |   9 +-
 .../client/MultiNodePipelineBlockAllocator.java    |  71 +++
 .../hadoop/ozone/client/TestOzoneECClient.java     | 274 +++++++++
 15 files changed, 1497 insertions(+), 29 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index def810f..fea14bf 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -199,7 +199,8 @@ public class BlockInputStream extends InputStream
   protected List<ChunkInfo> getChunkInfos() throws IOException {
     // irrespective of the container state, we will always read via Standalone
     // protocol.
-    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE && 
pipeline
+        .getType() != HddsProtos.ReplicationType.EC) {
       pipeline = Pipeline.newBuilder(pipeline)
           .setReplicationConfig(new StandaloneReplicationConfig(
               ReplicationConfig
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 92ce3c6..98bd157 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -219,6 +219,22 @@ public class BlockOutputStream extends OutputStream {
     return ioException.get();
   }
 
+  XceiverClientSpi getXceiverClientSpi(){
+    return this.xceiverClient;
+  }
+
+  BlockData.Builder getContainerBlockData(){
+    return this.containerBlockData;
+  }
+
+  Token<? extends TokenIdentifier> getToken(){
+    return this.token;
+  }
+
+  ExecutorService getResponseExecutor(){
+    return this.responseExecutor;
+  }
+
   @VisibleForTesting
   public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
     return commitWatcher.getCommitIndex2flushedDataMap();
@@ -405,7 +421,7 @@ public class BlockOutputStream extends OutputStream {
    * @param force true if no data was written since most recent putBlock and
    *            stream is being closed
    */
-  private CompletableFuture<ContainerProtos.
+  CompletableFuture<ContainerProtos.
       ContainerCommandResponseProto> executePutBlock(boolean close,
       boolean force) throws IOException {
     checkOpen();
@@ -573,7 +589,7 @@ public class BlockOutputStream extends OutputStream {
     combinedFuture.get();
   }
 
-  private void validateResponse(
+  void validateResponse(
       ContainerProtos.ContainerCommandResponseProto responseProto)
       throws IOException {
     try {
@@ -592,7 +608,7 @@ public class BlockOutputStream extends OutputStream {
   }
 
 
-  private void setIoException(Exception e) {
+  void setIoException(Exception e) {
     IOException ioe = getIoException();
     if (ioe == null) {
       IOException exception =  new IOException(EXCEPTION_MSG + e.toString(), 
e);
@@ -624,7 +640,7 @@ public class BlockOutputStream extends OutputStream {
    *
    * @throws IOException if stream is closed
    */
-  private void checkOpen() throws IOException {
+  void checkOpen() throws IOException {
     if (isClosed()) {
       throw new IOException("BlockOutputStream has been closed.");
     } else if (getIoException() != null) {
@@ -645,7 +661,7 @@ public class BlockOutputStream extends OutputStream {
    * @throws OzoneChecksumException if there is an error while computing
    * checksum
    */
-  private void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
+  void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
     int effectiveChunkSize = chunk.remaining();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
     final ByteString data = chunk.toByteString(
@@ -705,7 +721,7 @@ public class BlockOutputStream extends OutputStream {
    * handle ExecutionException else skip it.
    * @throws IOException
    */
-  private void handleInterruptedException(Exception ex,
+  void handleInterruptedException(Exception ex,
       boolean processExecutionException)
       throws IOException {
     LOG.error("Command execution was interrupted.");
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
new file mode 100644
index 0000000..30bd2b7
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
+
+/**
+ * Handles the chunk EC writes for an EC internal block.
+ */
+public class ECBlockOutputStream extends BlockOutputStream{
+
+  /**
+   * Creates a new ECBlockOutputStream.
+   *
+   * @param blockID              block ID
+   * @param xceiverClientManager client manager that controls client
+   * @param pipeline             pipeline where block will be written
+   * @param bufferPool           pool of buffers
+   */
+  public ECBlockOutputStream(
+      BlockID blockID,
+      XceiverClientFactory xceiverClientManager,
+      Pipeline pipeline,
+      BufferPool bufferPool,
+      OzoneClientConfig config,
+      Token<? extends TokenIdentifier> token
+  ) throws IOException {
+    super(blockID, xceiverClientManager,
+        pipeline, bufferPool, config, token);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
+  }
+
+  /**
+   * @param close whether putBlock is happening as part of closing the stream
+   * @param force true if no data was written since most recent putBlock and
+   *            stream is being closed
+   */
+  public CompletableFuture<ContainerProtos.
+      ContainerCommandResponseProto> executePutBlock(boolean close,
+      boolean force) throws IOException {
+    checkOpen();
+
+    CompletableFuture<ContainerProtos.
+        ContainerCommandResponseProto> flushFuture = null;
+    try {
+      ContainerProtos.BlockData blockData = getContainerBlockData().build();
+      XceiverClientReply asyncReply =
+          putBlockAsync(getXceiverClient(), blockData, close, getToken());
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+          asyncReply.getResponse();
+      flushFuture = future.thenApplyAsync(e -> {
+        try {
+          validateResponse(e);
+        } catch (IOException sce) {
+          throw new CompletionException(sce);
+        }
+        // if the ioException is not set, putBlock is successful
+        if (getIoException() == null) {
+          BlockID responseBlockID = BlockID.getFromProtobuf(
+              e.getPutBlock().getCommittedBlockLength().getBlockID());
+          Preconditions.checkState(getBlockID().getContainerBlockID()
+              .equals(responseBlockID.getContainerBlockID()));
+        }
+        return e;
+      }, getResponseExecutor()).exceptionally(e -> {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("putBlock failed for blockID {} with exception {}",
+              getBlockID(), e.getLocalizedMessage());
+        }
+        CompletionException ce =  new CompletionException(e);
+        setIoException(ce);
+        throw ce;
+      });
+    } catch (IOException | ExecutionException e) {
+      throw new IOException(EXCEPTION_MSG + e.toString(), e);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      handleInterruptedException(ex, false);
+    }
+    return flushFuture;
+  }
+}
diff --git a/hadoop-ozone/client/pom.xml b/hadoop-ozone/client/pom.xml
index e7635d0..56ebe56 100644
--- a/hadoop-ozone/client/pom.xml
+++ b/hadoop-ozone/client/pom.xml
@@ -43,6 +43,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs-client</artifactId>
+      </dependency>
   </dependencies>
 
   <build>
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 594bbf0..21c88cf 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -37,7 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
 /**
  * Helper class used inside {@link BlockOutputStream}.
  * */
-public final class BlockOutputStreamEntry extends OutputStream {
+public class BlockOutputStreamEntry extends OutputStream {
 
   private final OzoneClientConfig config;
   private OutputStream outputStream;
@@ -54,7 +54,7 @@ public final class BlockOutputStreamEntry extends 
OutputStream {
   private BufferPool bufferPool;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
-  private BlockOutputStreamEntry(
+  BlockOutputStreamEntry(
       BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager,
       Pipeline pipeline,
@@ -95,12 +95,14 @@ public final class BlockOutputStreamEntry extends 
OutputStream {
    */
   private void checkStream() throws IOException {
     if (this.outputStream == null) {
-      this.outputStream =
-          new BlockOutputStream(blockID, xceiverClientManager,
-              pipeline, bufferPool, config, token);
+      this.outputStream = createOutputStream();
     }
   }
 
+  BlockOutputStream createOutputStream() throws IOException {
+    return new BlockOutputStream(blockID, xceiverClientManager,
+        pipeline, bufferPool, config, token);
+  }
 
   @Override
   public void write(int b) throws IOException {
@@ -270,6 +272,10 @@ public final class BlockOutputStreamEntry extends 
OutputStream {
     return key;
   }
 
+  public OzoneClientConfig getConf(){
+    return this.config;
+  }
+
   public XceiverClientFactory getXceiverClientManager() {
     return xceiverClientManager;
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index e9147a8..0af651c 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -145,7 +145,7 @@ public class BlockOutputStreamEntryPool {
     }
   }
 
-  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+  void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
     Preconditions.checkNotNull(subKeyInfo.getPipeline());
     BlockOutputStreamEntry.Builder builder =
         new BlockOutputStreamEntry.Builder()
@@ -160,9 +160,18 @@ public class BlockOutputStreamEntryPool {
     streamEntries.add(builder.build());
   }
 
-  public List<OmKeyLocationInfo> getLocationInfoList()  {
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    List<OmKeyLocationInfo> locationInfoList;
+    List<OmKeyLocationInfo> currBlocksLocationInfoList =
+        getOmKeyLocationInfos(streamEntries);
+    locationInfoList = currBlocksLocationInfoList;
+    return locationInfoList;
+  }
+
+  List<OmKeyLocationInfo> getOmKeyLocationInfos(
+      List<BlockOutputStreamEntry> streams) {
     List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
-    for (BlockOutputStreamEntry streamEntry : streamEntries) {
+    for (BlockOutputStreamEntry streamEntry : streams) {
       long length = streamEntry.getCurrentPosition();
 
       // Commit only those blocks to OzoneManager which are not empty
@@ -184,6 +193,14 @@ public class BlockOutputStreamEntryPool {
     return locationInfoList;
   }
 
+  public BufferPool getBufferPool() {
+    return this.bufferPool;
+  }
+
+  public OzoneClientConfig getConfig() {
+    return config;
+  }
+
   /**
    * Discards the subsequent pre allocated blocks and removes the streamEntries
    * from the streamEntries list for the container which is closed.
@@ -224,8 +241,8 @@ public class BlockOutputStreamEntryPool {
   }
 
   long getKeyLength() {
-    return streamEntries.stream().mapToLong(
-        BlockOutputStreamEntry::getCurrentPosition).sum();
+    return streamEntries.stream()
+        .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
   }
   /**
    * Contact OM to get a new block. Set the new block with the index (e.g.
@@ -274,6 +291,18 @@ public class BlockOutputStreamEntryPool {
     }
   }
 
+  public int getCurrIdx(){
+    return currentStreamIndex;
+  }
+
+  public void setCurrIdx(int currIdx) {
+    this.currentStreamIndex = currIdx;
+  }
+
+  public void updateToNextStream(int rotation){
+    currentStreamIndex = (currentStreamIndex+1) % rotation;
+  }
+
   BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException {
     BlockOutputStreamEntry streamEntry = getCurrentStreamEntry();
     if (streamEntry != null && streamEntry.isClosed()) {
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
new file mode 100644
index 0000000..383ed17
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+
+/**
+ * Helper for {@link ECBlockOutputStream}.
+ */
+public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
+  private ECBlockOutputStream out;
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  ECBlockOutputStreamEntry(BlockID blockID, String key,
+      XceiverClientFactory xceiverClientManager, Pipeline pipeline, long 
length,
+      BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
+      OzoneClientConfig config) {
+    super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
+        token, config);
+  }
+
+  @Override
+  ECBlockOutputStream createOutputStream() throws IOException {
+    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
+        getPipeline(), getBufferPool(), getConf(), getToken());
+    return this.out;
+  }
+
+  void executePutBlock() throws IOException {
+    this.out.executePutBlock(false, true);
+  }
+
+  /**
+   * Builder class for ChunkGroupOutputStreamEntry.
+   * */
+  public static class Builder {
+
+    private BlockID blockID;
+    private String key;
+    private XceiverClientFactory xceiverClientManager;
+    private Pipeline pipeline;
+    private long length;
+    private BufferPool bufferPool;
+    private Token<OzoneBlockTokenIdentifier> token;
+    private OzoneClientConfig config;
+
+    public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
+      this.blockID = bID;
+      return this;
+    }
+
+    public ECBlockOutputStreamEntry.Builder setKey(String keys) {
+      this.key = keys;
+      return this;
+    }
+
+    public ECBlockOutputStreamEntry.Builder setXceiverClientManager(
+        XceiverClientFactory
+            xClientManager) {
+      this.xceiverClientManager = xClientManager;
+      return this;
+    }
+
+    public ECBlockOutputStreamEntry.Builder setPipeline(Pipeline ppln) {
+      this.pipeline = ppln;
+      return this;
+    }
+
+
+    public ECBlockOutputStreamEntry.Builder setLength(long len) {
+      this.length = len;
+      return this;
+    }
+
+
+    public ECBlockOutputStreamEntry.Builder setBufferPool(BufferPool pool) {
+      this.bufferPool = pool;
+      return this;
+    }
+
+    public ECBlockOutputStreamEntry.Builder setConfig(
+        OzoneClientConfig clientConfig) {
+      this.config = clientConfig;
+      return this;
+    }
+
+    public ECBlockOutputStreamEntry.Builder setToken(
+        Token<OzoneBlockTokenIdentifier> bToken) {
+      this.token = bToken;
+      return this;
+    }
+
+    public ECBlockOutputStreamEntry build() {
+      return new ECBlockOutputStreamEntry(blockID,
+          key,
+          xceiverClientManager,
+          pipeline,
+          length,
+          bufferPool,
+          token, config);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
new file mode 100644
index 0000000..ca76b75
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class manages the stream entries list and handles block allocation
+ * from OzoneManager for EC writes.
+ */
+public class ECBlockOutputStreamEntryPool extends BlockOutputStreamEntryPool {
+  private final List<BlockOutputStreamEntry> finishedStreamEntries;
+  private final ECReplicationConfig ecReplicationConfig;
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECBlockOutputStreamEntryPool(OzoneClientConfig config,
+      OzoneManagerProtocol omClient,
+      String requestId,
+      ReplicationConfig replicationConfig,
+      String uploadID,
+      int partNumber,
+      boolean isMultipart,
+      OmKeyInfo info,
+      boolean unsafeByteBufferConversion,
+      XceiverClientFactory xceiverClientFactory,
+      long openID) {
+    super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
+        isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
+        openID);
+    this.finishedStreamEntries = new ArrayList<>();
+    assert replicationConfig instanceof ECReplicationConfig;
+    this.ecReplicationConfig = (ECReplicationConfig) replicationConfig;
+  }
+
+  @Override
+  void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+    Preconditions.checkNotNull(subKeyInfo.getPipeline());
+    List<DatanodeDetails> nodes = subKeyInfo.getPipeline().getNodes();
+    for (int i = 0; i < nodes.size(); i++) {
+      List<DatanodeDetails> nodeStatus = new ArrayList<>();
+      nodeStatus.add(nodes.get(i));
+      Map<DatanodeDetails, Integer> nodeVsIdx = new HashMap<>();
+      nodeVsIdx.put(nodes.get(i), i + 1);
+      Pipeline pipeline =
+          Pipeline.newBuilder().setId(subKeyInfo.getPipeline().getId())
+              .setReplicationConfig(
+                  subKeyInfo.getPipeline().getReplicationConfig())
+              .setState(subKeyInfo.getPipeline().getPipelineState())
+              .setNodes(nodeStatus).setReplicaIndexes(nodeVsIdx).build();
+
+      ECBlockOutputStreamEntry.Builder builder =
+          new ECBlockOutputStreamEntry.Builder()
+              .setBlockID(subKeyInfo.getBlockID()).setKey(getKeyName())
+              .setXceiverClientManager(getXceiverClientFactory())
+              .setPipeline(pipeline).setConfig(getConfig())
+              .setLength(subKeyInfo.getLength()).setBufferPool(getBufferPool())
+              .setToken(subKeyInfo.getToken());
+      getStreamEntries().add(builder.build());
+    }
+  }
+
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    List<OmKeyLocationInfo> locationInfoList;
+    List<OmKeyLocationInfo> currBlocksLocationInfoList =
+        getOmKeyLocationInfos(getStreamEntries());
+    List<OmKeyLocationInfo> prevBlksKeyLocationInfos =
+        getOmKeyLocationInfos(finishedStreamEntries);
+    prevBlksKeyLocationInfos.addAll(currBlocksLocationInfoList);
+    locationInfoList = prevBlksKeyLocationInfos;
+    return locationInfoList;
+  }
+
+  long getKeyLength() {
+    long totalLength = getStreamEntries().stream().filter(c -> {
+      return (c.getPipeline().getReplicaIndex(
+          c.getPipeline().getNodes().iterator()
+              .next())) <= ecReplicationConfig.getData();
+    }).mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
+    totalLength += finishedStreamEntries.stream().filter(c -> {
+      return (c.getPipeline().getReplicaIndex(
+          c.getPipeline().getNodes().iterator()
+              .next())) <= ecReplicationConfig.getData();
+    }).mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
+    return totalLength;
+  }
+
+  public void endECBlock() throws IOException {
+    List<BlockOutputStreamEntry> entries = getStreamEntries();
+    if (entries.size() > 0) {
+      for (int i = entries.size() - 1; i >= 0; i--) {
+        finishedStreamEntries.add(entries.remove(i));
+      }
+    }
+
+    for (BlockOutputStreamEntry entry : finishedStreamEntries) {
+      entry.close();
+    }
+    super.cleanup();
+  }
+
+  void executePutBlockForAll() throws IOException {
+    List<BlockOutputStreamEntry> streamEntries = getStreamEntries();
+    for (int i = 0; i < streamEntries.size(); i++) {
+      ((ECBlockOutputStreamEntry) streamEntries.get(i)).executePutBlock();
+    }
+  }
+
+  void cleanupAll() {
+    super.cleanup();
+    if (finishedStreamEntries != null) {
+      finishedStreamEntries.clear();
+    }
+  }
+
+  public void updateToNextStream(int rotation) {
+    super.setCurrIdx((getCurrIdx() + 1) % rotation);
+  }
+
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
new file mode 100644
index 0000000..474dcab
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+  private OzoneClientConfig config;
+  private ECChunkBuffers ecChunkBufferCache;
+  private int ecChunkSize = 1024;
+  private final int numDataBlks;
+  private final int numParityBlks;
+  private static final ByteBufferPool BUFFER_POOL = new 
ElasticByteBufferPool();
+  private final RawErasureEncoder encoder;
+  // TODO: EC: Currently using the below EC Schema. This has to be modified and
+  //  created dynamically once OM return the configured scheme details.
+  private static final String DEFAULT_CODEC_NAME = "rs";
+
+  private long currentBlockGroupLen = 0;
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return blockOutputStreamEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockOutputStreamEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion) {
+    this.config = config;
+    // For EC, cell/chunk size and buffer size can be same for now.
+    this.config.setStreamBufferMaxSize(ecChunkSize);
+    this.config.setStreamBufferFlushSize(ecChunkSize);
+    this.config.setStreamBufferSize(ecChunkSize);
+    assert replicationConfig instanceof ECReplicationConfig;
+    this.numDataBlks = ((ECReplicationConfig) replicationConfig).getData();
+    this.numParityBlks = ((ECReplicationConfig) replicationConfig).getParity();
+    ecChunkBufferCache =
+        new ECChunkBuffers(ecChunkSize, numDataBlks, numParityBlks);
+    OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new ECBlockOutputStreamEntryPool(config, omClient, requestId,
+            replicationConfig, uploadID, partNumber, isMultipart, info,
+            unsafeByteBufferConversion, xceiverClientManager, handler.getId());
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.isException = false;
+    this.writeOffset = 0;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    ECSchema schema =
+        new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+    ErasureCodecOptions options = new ErasureCodecOptions(schema);
+    RSErasureCodec codec = new RSErasureCodec(conf, options);
+    this.encoder = CodecUtil.createRawEncoder(conf,
+        SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+        codec.getCoderOptions());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's 
version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be 
picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream 
entries.
+   *
+   * @param version     the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to underlying EC block
+   * streams.
+   *
+   * @param b   byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    int currentChunkBufferRemainingLength =
+        ecChunkBufferCache.dataBuffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .remaining();
+    int currentChunkBufferLen =
+        ecChunkBufferCache.dataBuffers[blockOutputStreamEntryPool.getCurrIdx()]
+            .position();
+    int maxLenToCurrChunkBuffer = (int) Math.min(len, ecChunkSize);
+    int currentWriterChunkLenToWrite =
+        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+    handleWrite(b, off, currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize,
+        false);
+    checkAndWriteParityCells();
+
+    int remLen = len - currentWriterChunkLenToWrite;
+    int iters = remLen / ecChunkSize;
+    int lastCellSize = remLen % ecChunkSize;
+    while (iters > 0) {
+      handleWrite(b, off, ecChunkSize, true, false);
+      off += ecChunkSize;
+      iters--;
+      checkAndWriteParityCells();
+    }
+
+    if (lastCellSize > 0) {
+      handleWrite(b, off, lastCellSize, false, false);
+      checkAndWriteParityCells();
+    }
+    writeOffset += len;
+  }
+
+  private void checkAndWriteParityCells() throws IOException {
+    //check data blocks finished
+    //If index is > datanum blks
+    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+      //Lets encode and write
+      //encoder.encode();
+      writeParityCells();
+      // By this time, we should have finished full stripe. So, lets call
+      // executePutBlock for all.
+      // TODO: we should alter the put block calls to share CRC to each stream.
+      blockOutputStreamEntryPool.executePutBlockForAll();
+      ecChunkBufferCache.clear();
+
+      // check if block ends?
+      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+          .getLength()) {
+        blockOutputStreamEntryPool.endECBlock();
+        currentBlockGroupLen = 0;
+      }
+    }
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
+    //encode the data cells
+    for (int i = 0; i < numDataBlks; i++) {
+      buffers[i].flip();
+    }
+
+    final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
+    encoder.encode(buffers, parityBuffers);
+    for (int i =
+         numDataBlks; i < (this.numDataBlks + this.numParityBlks); i++) {
+      handleWrite(parityBuffers[i - numDataBlks].array(), 0, ecChunkSize, true,
+          true);
+    }
+  }
+
+  private void handleWrite(byte[] b, int off, long len, boolean isFullCell,
+      boolean isParity) throws IOException {
+    if (!isParity) {
+      ecChunkBufferCache
+          .addToDataBuffer(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+              (int) len);
+    }
+    BlockOutputStreamEntry current =
+        blockOutputStreamEntryPool.allocateBlockIfNeeded();
+    int writeLengthToCurrStream =
+        Math.min((int) len, (int) current.getRemaining());
+    currentBlockGroupLen += isParity ? 0 : writeLengthToCurrStream;
+    if (current.getRemaining() <= 0) {
+      // since the current block is already written close the stream.
+      closeCurrentStream(StreamAction.CLOSE);
+    }
+
+    len -= writeLengthToCurrStream;
+    if (isFullCell) {
+      ByteBuffer bytesToWrite = isParity ?
+          ecChunkBufferCache.getParityBuffers()[blockOutputStreamEntryPool
+              .getCurrIdx() - numDataBlks] :
+          ecChunkBufferCache.getDataBuffers()[blockOutputStreamEntryPool
+              .getCurrIdx()];
+      try {
+        writeToOutputStream(current, len, bytesToWrite.array(),
+            bytesToWrite.array().length, 0, current.getWrittenDataLength(),
+            isParity);
+      } catch (Exception e) {
+        markStreamClosed();
+      }
+
+      blockOutputStreamEntryPool
+          .updateToNextStream(numDataBlks + numParityBlks);
+    }
+  }
+
+  private int writeToOutputStream(BlockOutputStreamEntry current, long len,
+      byte[] b, int writeLen, int off, long currentPos, boolean isParity)
+      throws IOException {
+    try {
+      current.write(b, off, writeLen);
+      if (!isParity) {
+        offset += writeLen;
+      }
+    } catch (IOException ioe) {
+      // for the current iteration, totalDataWritten - currentPos gives the
+      // amount of data already written to the buffer
+
+      // In the retryPath, the total data to be written will always be equal
+      // to or less than the max length of the buffer allocated.
+      // The len specified here is the combined sum of the data length of
+      // the buffers
+      Preconditions.checkState(len <= config.getStreamBufferMaxSize());
+      int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
+      writeLen = dataWritten;
+
+      if (!isParity) {
+        offset += writeLen;
+      }
+      LOG.debug("writeLen {}, total len {}", writeLen, len);
+      handleException(current, ioe);
+    }
+    return writeLen;
+  }
+
+  private void handleException(BlockOutputStreamEntry streamEntry,
+      IOException exception) throws IOException {
+    Throwable t = HddsClientUtils.checkForException(exception);
+    Preconditions.checkNotNull(t);
+    // In EC, we will just close the current stream.
+    streamEntry.close();
+  }
+
+  private void markStreamClosed() {
+    blockOutputStreamEntryPool.cleanup();
+    closed = true;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    checkNotClosed();
+    handleFlushOrClose(StreamAction.FLUSH);
+  }
+
+  /**
+   * Close or Flush the latest outputStream depending upon the action.
+   * This function gets called when while write is going on, the current stream
+   * gets full or explicit flush or close request is made by client.
+   *
+   * @param op Flag which decides whether to call close or flush on the
+   *           outputStream.
+   * @throws IOException In case, flush or close fails with exception.
+   */
+  @SuppressWarnings("squid:S1141")
+  private void handleFlushOrClose(StreamAction op) throws IOException {
+    if (!blockOutputStreamEntryPool.isEmpty()) {
+      while (true) {
+        try {
+          BlockOutputStreamEntry entry =
+              blockOutputStreamEntryPool.getCurrentStreamEntry();
+          if (entry != null) {
+            try {
+              handleStreamAction(entry, op);
+            } catch (IOException ioe) {
+              handleException(entry, ioe);
+              continue;
+            }
+          }
+          return;
+        } catch (Exception e) {
+          markStreamClosed();
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void handleFlushOrCloseAllStreams(StreamAction op)
+      throws IOException {
+    if (!blockOutputStreamEntryPool.isEmpty()) {
+      List<BlockOutputStreamEntry> allStreamEntries =
+          blockOutputStreamEntryPool.getStreamEntries();
+      for (int i = 0; i < allStreamEntries.size(); i++) {
+        while (true) {
+          try {
+            BlockOutputStreamEntry entry = allStreamEntries.get(i);
+            if (entry != null) {
+              try {
+                handleStreamAction(entry, op);
+              } catch (IOException ioe) {
+                handleException(entry, ioe);
+                continue;
+              }
+            }
+            return;
+          } catch (Exception e) {
+            markStreamClosed();
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
+  private void closeCurrentStream(StreamAction op) throws IOException {
+    if (!blockOutputStreamEntryPool.isEmpty()) {
+      List<BlockOutputStreamEntry> allStreamEntries =
+          blockOutputStreamEntryPool.getStreamEntries();
+      for (int i = 0; i < allStreamEntries.size(); i++) {
+        while (true) {
+          try {
+            BlockOutputStreamEntry entry = allStreamEntries.get(i);
+            if (entry != null) {
+              try {
+                handleStreamAction(entry, op);
+              } catch (IOException ioe) {
+                handleException(entry, ioe);
+                continue;
+              }
+            }
+            return;
+          } catch (Exception e) {
+            markStreamClosed();
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
+  private void handleStreamAction(BlockOutputStreamEntry entry, StreamAction 
op)
+      throws IOException {
+    Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+    // failed servers can be null in case there is no data written in
+    // the stream
+    if (!failedServers.isEmpty()) {
+      blockOutputStreamEntryPool.getExcludeList().addDatanodes(failedServers);
+    }
+    switch (op) {
+    case CLOSE:
+      entry.close();
+      break;
+    case FULL:
+      if (entry.getRemaining() == 0) {
+        entry.close();
+      }
+      break;
+    case FLUSH:
+      entry.flush();
+      break;
+    default:
+      throw new IOException("Invalid Operation");
+    }
+  }
+
+  /**
+   * Commit the key to OM, this will add the blocks as the new key blocks.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      handleFlushOrCloseAllStreams(StreamAction.CLOSE);
+      if (!isException) {
+        Preconditions.checkArgument(writeOffset == offset);
+      }
+      blockOutputStreamEntryPool.endECBlock();
+      blockOutputStreamEntryPool.commitKey(offset);
+    } finally {
+      blockOutputStreamEntryPool.cleanupAll();
+    }
+    ecChunkBufferCache.release();
+  }
+
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    return blockOutputStreamEntryPool.getCommitUploadPartInfo();
+  }
+
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return feInfo;
+  }
+
+  @VisibleForTesting
+  public ExcludeList getExcludeList() {
+    return blockOutputStreamEntryPool.getExcludeList();
+  }
+
+  /**
+   * Builder class of ECKeyOutputStream.
+   */
+  public static class Builder {
+    private OpenKeySession openHandler;
+    private XceiverClientFactory xceiverManager;
+    private OzoneManagerProtocol omClient;
+    private int chunkSize;
+    private String requestID;
+    private String multipartUploadID;
+    private int multipartNumber;
+    private boolean isMultipartKey;
+    private boolean unsafeByteBufferConversion;
+    private OzoneClientConfig clientConfig;
+    private ReplicationConfig replicationConfig;
+
+    public Builder setMultipartUploadID(String uploadID) {
+      this.multipartUploadID = uploadID;
+      return this;
+    }
+
+    public Builder setMultipartNumber(int partNumber) {
+      this.multipartNumber = partNumber;
+      return this;
+    }
+
+    public Builder setHandler(OpenKeySession handler) {
+      this.openHandler = handler;
+      return this;
+    }
+
+    public Builder setXceiverClientManager(XceiverClientFactory manager) {
+      this.xceiverManager = manager;
+      return this;
+    }
+
+    public Builder setOmClient(OzoneManagerProtocol client) {
+      this.omClient = client;
+      return this;
+    }
+
+    public Builder setChunkSize(int size) {
+      this.chunkSize = size;
+      return this;
+    }
+
+    public Builder setRequestID(String id) {
+      this.requestID = id;
+      return this;
+    }
+
+    public Builder setIsMultipartKey(boolean isMultipart) {
+      this.isMultipartKey = isMultipart;
+      return this;
+    }
+
+    public Builder setConfig(OzoneClientConfig config) {
+      this.clientConfig = config;
+      return this;
+    }
+
+    public Builder enableUnsafeByteBufferConversion(boolean enabled) {
+      this.unsafeByteBufferConversion = enabled;
+      return this;
+    }
+
+    public ECKeyOutputStream.Builder setReplicationConfig(
+        ReplicationConfig replConfig) {
+      this.replicationConfig = replConfig;
+      return this;
+    }
+
+    public ECKeyOutputStream build() {
+      return new ECKeyOutputStream(clientConfig, openHandler, xceiverManager,
+          omClient, chunkSize, requestID, replicationConfig, multipartUploadID,
+          multipartNumber, isMultipartKey, unsafeByteBufferConversion);
+    }
+  }
+
+  /**
+   * Verify that the output stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   *
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(
+          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+              + blockOutputStreamEntryPool.getKeyName());
+    }
+  }
+
+  private static class ECChunkBuffers {
+    private final ByteBuffer[] dataBuffers;
+    private final ByteBuffer[] parityBuffers;
+    private final int dataBlks;
+    private final int parityBlks;
+    private int cellSize;
+
+    ECChunkBuffers(int cellSize, int numData, int numParity) {
+      this.cellSize = cellSize;
+      this.parityBlks = numParity;
+      this.dataBlks = numData;
+      dataBuffers = new ByteBuffer[this.dataBlks];
+      parityBuffers = new ByteBuffer[this.parityBlks];
+      allocateBuffers(cellSize, dataBuffers);
+      allocateBuffers(cellSize, parityBuffers);
+    }
+
+    private ByteBuffer[] getDataBuffers() {
+      return dataBuffers;
+    }
+
+    private ByteBuffer[] getParityBuffers() {
+      return parityBuffers;
+    }
+
+    private int addToDataBuffer(int i, byte[] b, int off, int len) {
+      final ByteBuffer buf = dataBuffers[i];
+      final int pos = buf.position() + len;
+      Preconditions.checkState(pos <= cellSize);
+      buf.put(b, off, len);
+      return pos;
+    }
+
+    private void clear() {
+      clearBuffers(cellSize, dataBuffers);
+      clearBuffers(cellSize, parityBuffers);
+    }
+
+    private void release() {
+      releaseBuffers(dataBuffers);
+      releaseBuffers(parityBuffers);
+    }
+
+    private static void allocateBuffers(int cellSize, ByteBuffer[] buffers) {
+      for (int i = 0; i < buffers.length; i++) {
+        buffers[i] = BUFFER_POOL.getBuffer(false, cellSize);
+        buffers[i].limit(cellSize);
+      }
+    }
+
+    private static void clearBuffers(int cellSize, ByteBuffer[] buffers) {
+      for (int i = 0; i < buffers.length; i++) {
+        buffers[i].clear();
+        buffers[i].limit(cellSize);
+      }
+    }
+
+    private static void releaseBuffers(ByteBuffer[] buffers) {
+      for (int i = 0; i < buffers.length; i++) {
+        if (buffers[i] != null) {
+          BUFFER_POOL.putBuffer(buffers[i]);
+          buffers[i] = null;
+        }
+      }
+    }
+  }
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index b49d05d..87da492 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -71,6 +71,7 @@ import 
org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
@@ -1365,16 +1366,25 @@ public class RpcClient implements ClientProtocol {
   private OzoneOutputStream createOutputStream(OpenKeySession openKey,
       String requestId, ReplicationConfig replicationConfig)
       throws IOException {
-    KeyOutputStream keyOutputStream =
-        new KeyOutputStream.Builder()
-            .setHandler(openKey)
-            .setXceiverClientManager(xceiverClientManager)
-            .setOmClient(ozoneManagerClient)
-            .setRequestID(requestId)
-            .setReplicationConfig(replicationConfig)
-            .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
-            .setConfig(clientConfig)
-            .build();
+    KeyOutputStream keyOutputStream = null;
+
+    if (openKey.getKeyInfo().getReplicationConfig()
+        .getReplicationType() == HddsProtos.ReplicationType.EC) {
+      keyOutputStream = new ECKeyOutputStream.Builder().setHandler(openKey)
+          .setXceiverClientManager(xceiverClientManager)
+          .setOmClient(ozoneManagerClient).setRequestID(requestId)
+          .setReplicationConfig(replicationConfig)
+          .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+          .setConfig(clientConfig).build();
+    } else {
+      keyOutputStream = new KeyOutputStream.Builder().setHandler(openKey)
+          .setXceiverClientManager(xceiverClientManager)
+          .setOmClient(ozoneManagerClient).setRequestID(requestId)
+          .setReplicationConfig(replicationConfig)
+          .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+          .setConfig(clientConfig).build();
+    }
+
     keyOutputStream
         .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
             openKey.getOpenVersion());
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
index cb9875b..73ab96b 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
@@ -48,7 +48,8 @@ public class MockDatanodeStorage {
   public void writeChunk(
       DatanodeBlockID blockID,
       ChunkInfo chunkInfo, ByteString bytes) {
-    data.put(createKey(blockID, chunkInfo), bytes);
+    data.put(createKey(blockID, chunkInfo),
+        ByteString.copyFrom(bytes.toByteArray()));
     chunks.put(createKey(blockID, chunkInfo), chunkInfo);
   }
 
@@ -70,4 +71,8 @@ public class MockDatanodeStorage {
         + chunkInfo.getChunkName() + "_" + chunkInfo.getOffset();
   }
 
+  public Map<String, ByteString> getAllBlockData(){
+    return this.data;
+  }
+
 }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index 17a7f6b..f38ee35 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -258,6 +258,10 @@ public class MockOmTransport implements OmTransport {
     return CreateBucketResponse.newBuilder().build();
   }
 
+  public Map<String, Map<String, Map<String, KeyInfo>>> getKeys(){
+    return this.keys;
+  }
+
   @Override
   public Text getDelegationTokenService() {
     return null;
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
index a5fa2bb..da72651 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
@@ -67,6 +67,13 @@ public class MockXceiverClientFactory
       boolean b) {
 
   }
-};
+
+  /**
+   * Returns data nodes details.
+   */
+  public Map<DatanodeDetails, MockDatanodeStorage> getStorages() {
+    return this.storage;
+  }
+}
 
 
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
new file mode 100644
index 0000000..7ba8adc
--- /dev/null
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Allocates the block with required number of nodes in the pipeline.
+ */
+public class MultiNodePipelineBlockAllocator implements MockBlockAllocator {
+  private long blockId;
+  private HddsProtos.Pipeline pipeline;
+  private int requiredNodes;
+
+  public MultiNodePipelineBlockAllocator(int requiredNodes) {
+    this.requiredNodes = requiredNodes;
+  }
+
+  @Override
+  public Iterable<? extends OzoneManagerProtocolProtos.KeyLocation>
+      allocateBlock(OzoneManagerProtocolProtos.KeyArgs keyArgs) {
+    if (pipeline == null) {
+      HddsProtos.Pipeline.Builder builder =
+          HddsProtos.Pipeline.newBuilder().setFactor(keyArgs.getFactor())
+              .setType(keyArgs.getType()).setId(
+              HddsProtos.PipelineID.newBuilder().setUuid128(
+                  HddsProtos.UUID.newBuilder().setLeastSigBits(1L)
+                      .setMostSigBits(1L).build()).build());
+
+      for (int i = 1; i <= requiredNodes; i++) {
+        builder.addMembers(HddsProtos.DatanodeDetailsProto.newBuilder()
+            .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(i)
+                .setMostSigBits(i).build()).setHostName("localhost")
+            .setIpAddress("1.2.3.4").addPorts(
+                HddsProtos.Port.newBuilder().setName("RATIS").setValue(1234 + 
i)
+                    .build()).build());
+      }
+      pipeline = builder.build();
+    }
+
+    List<OzoneManagerProtocolProtos.KeyLocation> results = new ArrayList<>();
+    results.add(OzoneManagerProtocolProtos.KeyLocation.newBuilder()
+        .setPipeline(pipeline).setBlockID(
+            HddsProtos.BlockID.newBuilder().setBlockCommitSequenceId(1L)
+                .setContainerBlockID(
+                    HddsProtos.ContainerBlockID.newBuilder().setContainerID(1L)
+                        .setLocalID(blockId++).build()).build()).setOffset(0L)
+        .setLength(keyArgs.getDataSize()).build());
+    return results;
+  }
+}
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
new file mode 100644
index 0000000..56e7d34
--- /dev/null
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.InMemoryConfiguration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Real unit test for OzoneECClient.
+ * <p>
+ * Used for testing Ozone client without external network calls.
+ */
+public class TestOzoneECClient {
+  private int chunkSize = 1024;
+  private int dataBlocks = 3;
+  private int parityBlocks = 2;
+  private OzoneClient client;
+  private ObjectStore store;
+  private String keyName = UUID.randomUUID().toString();
+  private byte[][] inputChunks = new byte[dataBlocks][chunkSize];
+  private final XceiverClientFactory factoryStub =
+      new MockXceiverClientFactory();
+  private final MockOmTransport transportStub = new MockOmTransport(
+      new MultiNodePipelineBlockAllocator(dataBlocks + parityBlocks));
+  private ECSchema schema = new ECSchema("rs", dataBlocks, parityBlocks);
+  private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+  private OzoneConfiguration conf = new OzoneConfiguration();
+  private RSErasureCodec codec = new RSErasureCodec(conf, options);
+  private final RawErasureEncoder encoder = CodecUtil.createRawEncoder(conf,
+      SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+      codec.getCoderOptions());
+
+  @Before
+  public void init() throws IOException {
+    ConfigurationSource config = new InMemoryConfiguration();
+    client = new OzoneClient(config, new RpcClient(config, null) {
+
+      @Override
+      protected OmTransport createOmTransport(String omServiceId)
+          throws IOException {
+        return transportStub;
+      }
+
+      @Override
+      protected XceiverClientFactory createXceiverClientFactory(
+          List<X509Certificate> x509Certificates) throws IOException {
+        return factoryStub;
+      }
+    });
+
+    store = client.getObjectStore();
+    initInputChunks();
+  }
+
+  private void initInputChunks() {
+    for (int i = 0; i < dataBlocks; i++) {
+      inputChunks[i] = getBytesWith(i + 1, chunkSize);
+    }
+  }
+
+  private byte[] getBytesWith(int singleDigitNumber, int total) {
+    StringBuilder builder = new StringBuilder(singleDigitNumber);
+    for (int i = 1; i <= total; i++) {
+      builder.append(singleDigitNumber);
+    }
+    return builder.toString().getBytes(UTF_8);
+  }
+
+  @After
+  public void close() throws IOException {
+    client.close();
+  }
+
+  @Test
+  public void testPutECKeyAndCheckDNStoredData() throws IOException {
+    OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+    OzoneKey key = bucket.getKey(keyName);
+    Assert.assertEquals(keyName, key.getName());
+    Map<DatanodeDetails, MockDatanodeStorage> storages =
+        ((MockXceiverClientFactory) factoryStub).getStorages();
+    DatanodeDetails[] dnDetails =
+        storages.keySet().toArray(new DatanodeDetails[storages.size()]);
+    Arrays.sort(dnDetails);
+    for (int i = 0; i < inputChunks.length; i++) {
+      MockDatanodeStorage datanodeStorage = storages.get(dnDetails[i]);
+      Assert.assertEquals(1, datanodeStorage.getAllBlockData().size());
+      ByteString content =
+          datanodeStorage.getAllBlockData().values().iterator().next();
+      Assert.assertEquals(new String(inputChunks[i], UTF_8),
+          content.toStringUtf8());
+    }
+  }
+
+  @Test
+  public void testPutECKeyAndCheckParityData() throws IOException {
+    OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+    final ByteBuffer[] dataBuffers = new ByteBuffer[3];
+    for (int i = 0; i < inputChunks.length; i++) {
+      dataBuffers[i] = ByteBuffer.wrap(inputChunks[i]);
+    }
+    final ByteBuffer[] parityBuffers = new ByteBuffer[parityBlocks];
+    for (int i = 0; i < parityBlocks; i++) {
+      parityBuffers[i] = ByteBuffer.allocate(1024);
+    }
+    encoder.encode(dataBuffers, parityBuffers);
+    OzoneKey key = bucket.getKey(keyName);
+    Assert.assertEquals(keyName, key.getName());
+    Map<DatanodeDetails, MockDatanodeStorage> storages =
+        ((MockXceiverClientFactory) factoryStub).getStorages();
+    DatanodeDetails[] dnDetails =
+        storages.keySet().toArray(new DatanodeDetails[storages.size()]);
+    Arrays.sort(dnDetails);
+
+    for (int i = dataBlocks; i < parityBlocks + dataBlocks; i++) {
+      MockDatanodeStorage datanodeStorage = storages.get(dnDetails[i]);
+      Assert.assertEquals(1, datanodeStorage.getAllBlockData().size());
+      ByteString content =
+          datanodeStorage.getAllBlockData().values().iterator().next();
+      Assert.assertEquals(
+          new String(parityBuffers[i - dataBlocks].array(), UTF_8),
+          content.toStringUtf8());
+    }
+
+  }
+
+  @Test
+  public void testPutECKeyAndReadContent() throws IOException {
+    OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+    OzoneKey key = bucket.getKey(keyName);
+    Assert.assertEquals(keyName, key.getName());
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[1024];
+      Assert.assertEquals(inputChunks[0].length, is.read(fileContent));
+      Assert.assertEquals(new String(inputChunks[0], UTF_8),
+          new String(fileContent, UTF_8));
+    }
+
+    // Since EC read is not ready yet, let's use the regular read by
+    // tweaking the pipeline.
+    // Remove first node in EC pipeline. So, regular read will hit the
+    // first node in pipeline and assert for second chunk in EC data.
+    updatePipelineToKeepSingleNode(2);
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[1024];
+      Assert.assertEquals(inputChunks[1].length, is.read(fileContent));
+      Assert.assertEquals(new String(inputChunks[1], UTF_8),
+          new String(fileContent, UTF_8));
+    }
+
+    updatePipelineToKeepSingleNode(3);
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[1024];
+      Assert.assertEquals(inputChunks[2].length, is.read(fileContent));
+      Assert.assertEquals(new String(inputChunks[2], UTF_8),
+          new String(fileContent, UTF_8));
+    }
+  }
+
+  private OzoneBucket writeIntoECKey(byte[][] chunks, String key)
+      throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    try (OzoneOutputStream out = bucket.createKey(key, 2000,
+        new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
+      for (int i = 0; i < chunks.length; i++) {
+        out.write(chunks[i]);
+      }
+    }
+    return bucket;
+  }
+
+  private void updatePipelineToKeepSingleNode(int keepingNodeIndex) {
+    Map<String, Map<String, Map<String, OzoneManagerProtocolProtos.KeyInfo>>>
+        keys = ((MockOmTransport) transportStub).getKeys();
+    Map<String, Map<String, OzoneManagerProtocolProtos.KeyInfo>> vol =
+        keys.get(keys.keySet().iterator().next());
+
+    Map<String, OzoneManagerProtocolProtos.KeyInfo> buck =
+        vol.get(vol.keySet().iterator().next());
+    OzoneManagerProtocolProtos.KeyInfo keyInfo =
+        buck.get(buck.keySet().iterator().next());
+    HddsProtos.Pipeline.Builder builder =
+        HddsProtos.Pipeline.newBuilder().setFactor(keyInfo.getFactor())
+            
.setType(keyInfo.getType()).setId(HddsProtos.PipelineID.newBuilder()
+            .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(1L)
+                .setMostSigBits(1L).build()).build());
+
+    // Keeping only the given position node in pipeline.
+    builder.addMembers(HddsProtos.DatanodeDetailsProto.newBuilder().setUuid128(
+        HddsProtos.UUID.newBuilder().setLeastSigBits(keepingNodeIndex)
+            .setMostSigBits(keepingNodeIndex).build()).setHostName("localhost")
+        .setIpAddress("1.2.3.4").addPorts(
+            HddsProtos.Port.newBuilder().setName("EC")
+                .setValue(1234 + keepingNodeIndex).build()).build());
+
+    HddsProtos.Pipeline pipeline = builder.build();
+    List<OzoneManagerProtocolProtos.KeyLocation> results = new ArrayList<>();
+    results.add(OzoneManagerProtocolProtos.KeyLocation.newBuilder()
+        .setPipeline(pipeline).setBlockID(
+            HddsProtos.BlockID.newBuilder().setBlockCommitSequenceId(1L)
+                .setContainerBlockID(
+                    HddsProtos.ContainerBlockID.newBuilder().setContainerID(1L)
+                        .setLocalID(0L).build()).build()).setOffset(0L)
+        .setLength(keyInfo.getDataSize()).build());
+
+    final OzoneManagerProtocolProtos.KeyInfo keyInfo1 =
+        OzoneManagerProtocolProtos.KeyInfo.newBuilder()
+            .setVolumeName(keyInfo.getVolumeName())
+            .setBucketName(keyInfo.getBucketName())
+            .setKeyName(keyInfo.getKeyName())
+            .setCreationTime(keyInfo.getCreationTime())
+            .setModificationTime(keyInfo.getModificationTime())
+            .setType(keyInfo.getType()).setFactor(keyInfo.getFactor())
+            .setDataSize(keyInfo.getDataSize()).setLatestVersion(0L)
+            .addKeyLocationList(
+                OzoneManagerProtocolProtos.KeyLocationList.newBuilder()
+                    .addAllKeyLocations(results)).build();
+    buck.put(keyInfo.getKeyName(), keyInfo1);
+  }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to