This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 38e4cca  HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to 
accommodate all block group related ECBlockOuputStreams. (#2702)
38e4cca is described below

commit 38e4cca27367bd43631211402b9c203784fa18fa
Author: Istvan Fajth <[email protected]>
AuthorDate: Thu Oct 14 07:39:47 2021 +0200

    HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block 
group related ECBlockOuputStreams. (#2702)
---
 .../hdds/scm/storage/ECBlockOutputStream.java      |   6 +
 .../ozone/client/io/BlockOutputStreamEntry.java    |  17 +-
 .../client/io/BlockOutputStreamEntryPool.java      |  18 +-
 .../ozone/client/io/ECBlockOutputStreamEntry.java  | 265 +++++++++++++++++++--
 .../client/io/ECBlockOutputStreamEntryPool.java    | 201 ++--------------
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 127 ++++------
 .../hadoop/ozone/client/io/KeyOutputStream.java    |   1 -
 .../client/MultiNodePipelineBlockAllocator.java    |   3 +
 .../hadoop/ozone/client/TestOzoneECClient.java     |  29 ++-
 .../client/io/TestECBlockOutputStreamEntry.java    | 128 ++++++++++
 .../ozone/client/rpc/TestECKeyOutputStream.java    |  88 ++-----
 11 files changed, 492 insertions(+), 391 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 30bd2b7..86d7058 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -115,4 +115,10 @@ public class ECBlockOutputStream extends BlockOutputStream{
     }
     return flushFuture;
   }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    cleanup(false);
+  }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index bb821b2..1fd89f6 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -42,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting;
  *
  * The base implementation is handling Ratis-3 writes, with a single stream,
  * but there can be other implementations that are using a different way.
- * */
+ */
 public class BlockOutputStreamEntry extends OutputStream {
 
   private final OzoneClientConfig config;
@@ -276,7 +276,8 @@ public class BlockOutputStreamEntry extends OutputStream {
    * here.
    * @param id the last know ID of the block.
    */
-  void updateBlockID(BlockID id) {
+  @VisibleForTesting
+  protected void updateBlockID(BlockID id) {
     this.blockID = id;
   }
 
@@ -297,18 +298,6 @@ public class BlockOutputStreamEntry extends OutputStream {
     return this.pipeline;
   }
 
-  /**
-   * Gets the Pipeline based on which the location report can be sent to the 
OM.
-   * This is necessary, as implementors might use special pipeline information
-   * that can be created during commit, but not during initialization,
-   * and might need to update some Pipeline information returned in
-   * OMKeyLocationInfo.
-   * @return
-   */
-  Pipeline getPipelineForOMLocationReport(){
-    return getPipeline();
-  }
-
   long getCurrentPosition() {
     return this.currentPosition;
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 9c4414f..912ba3a 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -184,7 +184,7 @@ public class BlockOutputStreamEntryPool {
             .build();
   }
 
-  void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
     Preconditions.checkNotNull(subKeyInfo.getPipeline());
     streamEntries.add(createStreamEntry(subKeyInfo));
   }
@@ -203,7 +203,7 @@ public class BlockOutputStreamEntryPool {
     return locationInfoList;
   }
 
-  List<OmKeyLocationInfo> getOmKeyLocationInfos(
+  private List<OmKeyLocationInfo> getOmKeyLocationInfos(
       List<BlockOutputStreamEntry> streams) {
     List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
     for (BlockOutputStreamEntry streamEntry : streams) {
@@ -217,7 +217,7 @@ public class BlockOutputStreamEntryPool {
                 .setLength(streamEntry.getCurrentPosition())
                 .setOffset(0)
                 .setToken(streamEntry.getToken())
-                .setPipeline(streamEntry.getPipelineForOMLocationReport())
+                .setPipeline(streamEntry.getPipeline())
                 .build();
         locationInfoList.add(info);
       }
@@ -345,18 +345,6 @@ public class BlockOutputStreamEntryPool {
     }
   }
 
-  public int getCurrIdx(){
-    return currentStreamIndex;
-  }
-
-  public void setCurrIdx(int currIdx) {
-    this.currentStreamIndex = currIdx;
-  }
-
-  public void updateToNextStream(int rotation) {
-    currentStreamIndex = (currentStreamIndex + 1) % rotation;
-  }
-
   /**
    * Allocates a new block with OM if the current stream is closed, and new
    * writes are to be handled.
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index df04204..22a1906 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -17,56 +17,286 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
- * Helper for {@link ECBlockOutputStream}.
+ * ECBlockOutputStreamEntry manages write into EC keys' data block groups.
+ * A block group consists of data and parity blocks. For every block we have
+ * an internal ECBlockOutputStream instance with a single node pipeline, that
+ * is derived from the original EC pipeline.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private final ECReplicationConfig replicationConfig;
+  private final long length;
+
+  private ECBlockOutputStream[] blockOutputStreams;
+  private int currentStreamIdx = 0;
+
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long 
length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (int i = 0;
+         i <= currentStreamIdx && i < blockOutputStreams.length; i++) {
+      if (blockOutputStreams[i] != null) {
+        blockOutputStreams[i].flush();
+      }
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }
+    return blockStreams().allMatch(BlockOutputStream::isClosed);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      if (stream != null) {
+        stream.close();
+      }
+    }
+    updateBlockID(underlyingBlockID());
+  }
+
+  @Override
+  long getTotalAckDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    updateBlockID(underlyingBlockID());
+    // Returning zero here. Underlying streams in EC entry are
+    // ECBlockOutputStreams, extending from BlockOutputStream, without
+    // overriding getTotalAckDataLength, and default implementation returns
+    // constant zero, so even summarizing the return value of this method
+    // from blockStreams entries would yield to 0. Once this changes, we need
+    // to revisit this, and implement a proper sum of data or all streams.
+    return 0;
+  }
+
+  /**
+   * Returns the amount of bytes that were attempted to be sent through towards
+   * the DataNodes, and the write call succeeded without an exception.
+   * In EC entries the parity writes does not count into this, as the written
+   * data length represents the attempts of the classes using the entry, and
+   * not the attempts of the entry itself.
+   * @return 0 if the stream is not initialized, the amount of data bytes that
+   *    were attempted to be written to the entry.
+   */
+  //TODO: this might become problematic, and should be tested during the
+  //      implementation of retries and error handling, as if there is a retry,
+  //      then some data might have to be written twice.
+  //      This current implementation is an assumption here.
+  //      We might need to account the parity bytes written here, or elsewhere.
+  @Override
+  long getWrittenDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    return dataStreams()
+        .mapToLong(BlockOutputStream::getWrittenDataLength)
+        .sum();
+  }
+
+  @Override
+  Collection<DatanodeDetails> getFailedServers() {
+    if (!isInitialized()) {
+      return Collections.emptyList();
+    }
+
+    return blockStreams()
+        .flatMap(outputStream -> outputStream.getFailedServers().stream())
+        .collect(Collectors.toList());
+  }
+
+  @VisibleForTesting
+  Pipeline createSingleECBlockPipeline(Pipeline ecPipeline,
+      DatanodeDetails node, int replicaIndex) {
+    Map<DatanodeDetails, Integer> indiciesForSinglePipeline = new HashMap<>();
+    indiciesForSinglePipeline.put(node, replicaIndex);
+    return Pipeline.newBuilder()
+        .setId(ecPipeline.getId())
+        .setReplicationConfig(ecPipeline.getReplicationConfig())
+        .setState(ecPipeline.getPipelineState())
+        .setNodes(ImmutableList.of(node))
+        .setReplicaIndexes(indiciesForSinglePipeline)
+        .build();
   }
 
   void executePutBlock() throws IOException {
-    this.out.executePutBlock(false, true);
+    if (!isInitialized()) {
+      return;
+    }
+    int failedStreams = 0;
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      if (stream == null) {
+        continue;
+      }
+      if (!stream.isClosed()) {
+        stream.executePutBlock(false, true);
+      } else {
+        failedStreams++;
+      }
+      if(failedStreams > replicationConfig.getParity()) {
+        throw new IOException(
+            "There are " + failedStreams + " block write failures,"
+                + " supported tolerance: " + replicationConfig.getParity());
+      }
+    }
+  }
+
+  private BlockID underlyingBlockID() {
+    if (blockOutputStreams[0] == null) {
+      return null;
+    }
+    // blockID is the same for EC blocks inside one block group managed by
+    // this entry, so updating based on the first stream, as when we write any
+    // data that is surely exists.
+    return blockOutputStreams[0].getBlockID();
+  }
+
+  private boolean isWritingParity() {
+    return currentStreamIdx >= replicationConfig.getData();
+  }
+
+  private Stream<ECBlockOutputStream> blockStreams() {
+    return Arrays.stream(blockOutputStreams).filter(Objects::nonNull);
   }
 
-  public boolean isParityStreamEntry() {
-    return this.isParityStreamEntry;
+  private Stream<ECBlockOutputStream> dataStreams() {
+    return Arrays.stream(blockOutputStreams)
+        .limit(replicationConfig.getData())
+        .filter(Objects::nonNull);
   }
 
   /**
    * Builder class for ChunkGroupOutputStreamEntry.
    * */
   public static class Builder {
-
     private BlockID blockID;
     private String key;
     private XceiverClientFactory xceiverClientManager;
@@ -75,7 +305,6 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
     private BufferPool bufferPool;
     private Token<OzoneBlockTokenIdentifier> token;
     private OzoneClientConfig config;
-    private boolean isParityStreamEntry;
 
     public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
       this.blockID = bID;
@@ -123,12 +352,6 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
       return this;
     }
 
-    public ECBlockOutputStreamEntry.Builder setIsParityStreamEntry(
-        boolean isParity) {
-      this.isParityStreamEntry = isParity;
-      return this;
-    }
-
     public ECBlockOutputStreamEntry build() {
       return new ECBlockOutputStreamEntry(blockID,
           key,
@@ -136,7 +359,7 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
           pipeline,
           length,
           bufferPool,
-          token, config, isParityStreamEntry);
+          token, config);
     }
   }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index aa27a72..e379956 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -17,33 +17,27 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
- * This class manages the stream entries list and handles block allocation
- * from OzoneManager for EC writes.
+ * {@link BlockOutputStreamEntryPool} is responsible to manage OM communication
+ * regarding writing a block to Ozone in a non-EC write case.
+ * The basic operations are fine for us but we need a specific
+ * {@link ECBlockOutputStreamEntry} implementation to handle writing EC block
+ * groups, this class implements the logic that handles the specific EC 
entries'
+ * instantiation and retrieval from the pool.
+ *
+ * @see ECKeyOutputStream
+ * @see BlockOutputStreamEntryPool
+ * @see ECBlockOutputStreamEntry
  */
 public class ECBlockOutputStreamEntryPool extends BlockOutputStreamEntryPool {
-  private final List<BlockOutputStreamEntry> finishedStreamEntries;
-  private final ECReplicationConfig ecReplicationConfig;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   public ECBlockOutputStreamEntryPool(OzoneClientConfig config,
@@ -60,173 +54,26 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
     super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
         isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
         openID);
-    this.finishedStreamEntries = new ArrayList<>();
     assert replicationConfig instanceof ECReplicationConfig;
-    this.ecReplicationConfig = (ECReplicationConfig) replicationConfig;
   }
 
   @Override
-  void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
-    Preconditions.checkNotNull(subKeyInfo.getPipeline());
-    List<DatanodeDetails> nodes = subKeyInfo.getPipeline().getNodes();
-    for (int i = 0; i < nodes.size(); i++) {
-      List<DatanodeDetails> nodeStatus = new ArrayList<>();
-      nodeStatus.add(nodes.get(i));
-      Map<DatanodeDetails, Integer> nodeVsIdx = new HashMap<>();
-      nodeVsIdx.put(nodes.get(i), i + 1);
-      Pipeline pipeline =
-          Pipeline.newBuilder().setId(subKeyInfo.getPipeline().getId())
-              .setReplicationConfig(
-                  subKeyInfo.getPipeline().getReplicationConfig())
-              .setState(subKeyInfo.getPipeline().getPipelineState())
-              .setNodes(nodeStatus).setReplicaIndexes(nodeVsIdx).build();
-
-      ECBlockOutputStreamEntry.Builder builder =
-          new ECBlockOutputStreamEntry.Builder()
-              .setBlockID(subKeyInfo.getBlockID()).setKey(getKeyName())
-              .setXceiverClientManager(getXceiverClientFactory())
-              .setPipeline(pipeline).setConfig(getConfig())
-              .setLength(subKeyInfo.getLength()).setBufferPool(getBufferPool())
-              .setToken(subKeyInfo.getToken())
-              .setIsParityStreamEntry(i >= ecReplicationConfig.getData());
-      getStreamEntries().add(builder.build());
-    }
-  }
-
-  public List<OmKeyLocationInfo> getLocationInfoList() {
-    List<OmKeyLocationInfo> locationInfoList;
-    List<OmKeyLocationInfo> currBlocksLocationInfoList =
-        getOmKeyLocationInfos(getStreamEntries());
-    List<OmKeyLocationInfo> prevBlksKeyLocationInfos =
-        getOmKeyLocationInfos(finishedStreamEntries);
-    prevBlksKeyLocationInfos.addAll(currBlocksLocationInfoList);
-    locationInfoList = prevBlksKeyLocationInfos;
-    return locationInfoList;
+  BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
+    return
+        new ECBlockOutputStreamEntry.Builder()
+            .setBlockID(subKeyInfo.getBlockID())
+            .setKey(getKeyName())
+            .setXceiverClientManager(getXceiverClientFactory())
+            .setPipeline(subKeyInfo.getPipeline())
+            .setConfig(getConfig())
+            .setLength(subKeyInfo.getLength())
+            .setBufferPool(getBufferPool())
+            .setToken(subKeyInfo.getToken())
+            .build();
   }
 
   @Override
-  long getKeyLength() {
-    long totalLength = getStreamEntries().stream()
-        .filter(c -> !((ECBlockOutputStreamEntry) c).isParityStreamEntry())
-        .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
-
-    totalLength += finishedStreamEntries.stream()
-        .filter(c -> !((ECBlockOutputStreamEntry) c).isParityStreamEntry())
-        .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
-    return totalLength;
+  public ECBlockOutputStreamEntry getCurrentStreamEntry() {
+    return (ECBlockOutputStreamEntry) super.getCurrentStreamEntry();
   }
-
-  @Override
-  List<OmKeyLocationInfo> getOmKeyLocationInfos(
-      List<BlockOutputStreamEntry> streams) {
-    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
-    Map<BlockID, ArrayList<ECBlockOutputStreamEntry>> blkIdVsStream =
-        new LinkedHashMap<>();
-
-    for (BlockOutputStreamEntry streamEntry : streams) {
-      BlockID blkID = streamEntry.getBlockID();
-      final ArrayList<ECBlockOutputStreamEntry> stream =
-          blkIdVsStream.getOrDefault(blkID, new ArrayList<>());
-      stream.add((ECBlockOutputStreamEntry) streamEntry);
-      blkIdVsStream.put(blkID, stream);
-    }
-
-    final Iterator<Map.Entry<BlockID, ArrayList<ECBlockOutputStreamEntry>>>
-        iterator = blkIdVsStream.entrySet().iterator();
-
-    while (iterator.hasNext()) {
-      final Map.Entry<BlockID, ArrayList<ECBlockOutputStreamEntry>>
-          blkGrpIDVsStreams = iterator.next();
-      final ArrayList<ECBlockOutputStreamEntry> blkGrpStreams =
-          blkGrpIDVsStreams.getValue();
-      List<DatanodeDetails> nodeStatus = new ArrayList<>();
-      Map<DatanodeDetails, Integer> nodeVsIdx = new HashMap<>();
-
-      // Assumption: Irrespective of failures, stream entries must have updated
-      // the lengths.
-      long blkGRpLen = 0;
-      for (ECBlockOutputStreamEntry internalBlkStream : blkGrpStreams) {
-        blkGRpLen += !(internalBlkStream).isParityStreamEntry() ?
-            internalBlkStream.getCurrentPosition() :
-            0;
-        // In EC, only one node per internal block stream.
-        final DatanodeDetails nodeDetails =
-            internalBlkStream.getPipeline().getNodeSet().iterator().next();
-        nodeStatus.add(nodeDetails);
-        nodeVsIdx.put(nodeDetails,
-            internalBlkStream.getPipeline().getReplicaIndex(nodeDetails));
-      }
-      nodeStatus.sort((o1, o2) -> nodeVsIdx.get(o1) - nodeVsIdx.get(o2));
-      final BlockOutputStreamEntry firstStreamInBlockGrp = 
blkGrpStreams.get(0);
-      Pipeline blockGrpPipeline = Pipeline.newBuilder()
-          .setId(firstStreamInBlockGrp.getPipeline().getId())
-          .setReplicationConfig(
-              firstStreamInBlockGrp.getPipeline().getReplicationConfig())
-          .setState(firstStreamInBlockGrp.getPipeline().getPipelineState())
-          .setNodes(nodeStatus).setReplicaIndexes(nodeVsIdx).build();
-      // Commit only those blocks to OzoneManager which are not empty
-      if (blkGRpLen != 0) {
-        OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
-            .setBlockID(blkGrpIDVsStreams.getKey()).setLength(blkGRpLen)
-            .setOffset(0).setToken(firstStreamInBlockGrp.getToken())
-            .setPipeline(blockGrpPipeline).build();
-        locationInfoList.add(info);
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("block written " + firstStreamInBlockGrp
-            .getBlockID() + ", length " + blkGRpLen + " bcsID "
-            + firstStreamInBlockGrp.getBlockID().getBlockCommitSequenceId());
-      }
-    }
-    return locationInfoList;
-  }
-
-  public void endECBlock() throws IOException {
-    List<BlockOutputStreamEntry> entries = getStreamEntries();
-    if (entries.size() > 0) {
-      for (int i = entries.size() - 1; i >= 0; i--) {
-        finishedStreamEntries.add(entries.remove(i));
-      }
-    }
-
-    for (BlockOutputStreamEntry entry : finishedStreamEntries) {
-      entry.close();
-    }
-    super.cleanup();
-  }
-
-  void executePutBlockForAll() throws IOException {
-    List<BlockOutputStreamEntry> streamEntries = getStreamEntries();
-    int failedStreams = 0;
-    for (int i = 0; i < streamEntries.size(); i++) {
-      ECBlockOutputStreamEntry ecBlockOutputStreamEntry =
-          (ECBlockOutputStreamEntry) streamEntries.get(i);
-      if (!ecBlockOutputStreamEntry.isClosed()) {
-        if(!ecBlockOutputStreamEntry.isInitialized()){
-          // Stream not initialized. Means this stream was not used to write.
-          continue;
-        }
-        ecBlockOutputStreamEntry.executePutBlock();
-      }else{
-        failedStreams++;
-      }
-    }
-    if(failedStreams > ecReplicationConfig.getParity()) {
-      throw new IOException(
-          "There are " + failedStreams + " failures than supported tolerance: "
-              + ecReplicationConfig.getParity());
-    }
-  }
-
-  void cleanupAll() {
-    super.cleanup();
-    if (finishedStreamEntries != null) {
-      finishedStreamEntries.clear();
-    }
-  }
-
-  public void updateToNextStream(int rotation) {
-    super.setCurrIdx((getCurrIdx() + 1) % rotation);
-  }
-
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index ec9402c..799e15d 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -78,7 +77,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
       LoggerFactory.getLogger(KeyOutputStream.class);
 
   private boolean closed;
-  private FileEncryptionInfo feInfo;
   // how much of data is actually written yet to underlying stream
   private long offset;
   // how much data has been ingested into the stream
@@ -126,9 +124,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
             replicationConfig, uploadID, partNumber, isMultipart, info,
             unsafeByteBufferConversion, xceiverClientManager, handler.getId());
 
-    // Retrieve the file encryption key info, null if file is not in
-    // encrypted bucket.
-    this.feInfo = info.getFileEncryptionInfo();
     this.isException = false;
     this.writeOffset = 0;
     OzoneConfiguration conf = new OzoneConfiguration();
@@ -181,71 +176,81 @@ public class ECKeyOutputStream extends KeyOutputStream {
     if (len == 0) {
       return;
     }
+    blockOutputStreamEntryPool.allocateBlockIfNeeded();
 
+    int currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
+        .getCurrentStreamIdx();
     int currentChunkBufferRemainingLength =
-        ecChunkBufferCache.dataBuffers[blockOutputStreamEntryPool.getCurrIdx()]
-            .remaining();
+        ecChunkBufferCache.dataBuffers[currentStreamIdx].remaining();
     int currentChunkBufferLen =
-        ecChunkBufferCache.dataBuffers[blockOutputStreamEntryPool.getCurrIdx()]
+        ecChunkBufferCache.dataBuffers[currentStreamIdx]
             .position();
-    int maxLenToCurrChunkBuffer = (int) Math.min(len, ecChunkSize);
+    int maxLenToCurrChunkBuffer = Math.min(len, ecChunkSize);
     int currentWriterChunkLenToWrite =
         Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
-    int pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+    int pos = handleDataWrite(currentStreamIdx, b, off,
         currentWriterChunkLenToWrite,
         currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize);
-    checkAndWriteParityCells(pos);
+    //TODO: do we really need this call?
+    checkAndWriteParityCells(pos, false);
     int remLen = len - currentWriterChunkLenToWrite;
     int iters = remLen / ecChunkSize;
     int lastCellSize = remLen % ecChunkSize;
     off += currentWriterChunkLenToWrite;
 
     while (iters > 0) {
-      pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
-          ecChunkSize, true);
+      currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
+          .getCurrentStreamIdx();
+      pos = handleDataWrite(currentStreamIdx, b, off, ecChunkSize, true);
       off += ecChunkSize;
       iters--;
-      checkAndWriteParityCells(pos);
+      checkAndWriteParityCells(pos, iters > 0 || remLen > 0);
     }
 
     if (lastCellSize > 0) {
-      pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+      currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
+          .getCurrentStreamIdx();
+      pos = handleDataWrite(currentStreamIdx, b, off,
           lastCellSize, false);
-      checkAndWriteParityCells(pos);
+      checkAndWriteParityCells(pos, false);
     }
     writeOffset += len;
   }
 
-  private void checkAndWriteParityCells(int lastDataBuffPos)
+  private void checkAndWriteParityCells(int lastDataBuffPos,
+      boolean allocateBlockIfFull)
       throws IOException {
     //check data blocks finished
     //If index is > datanum blks
-    if (blockOutputStreamEntryPool
-        .getCurrIdx() == numDataBlks && lastDataBuffPos == ecChunkSize) {
+    int currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
+        .getCurrentStreamIdx();
+    if (currentStreamIdx == numDataBlks && lastDataBuffPos == ecChunkSize) {
       //Lets encode and write
-      handleParityWrites(ecChunkSize);
+      handleParityWrites(ecChunkSize, allocateBlockIfFull);
     }
   }
 
-  private void handleParityWrites(int parityCellSize) throws IOException {
+  private void handleParityWrites(int parityCellSize,
+      boolean allocateBlockIfFull)
+      throws IOException {
     writeParityCells(parityCellSize);
     // By this time, we should have finished full stripe. So, lets call
     // executePutBlock for all.
     // TODO: we should alter the put block calls to share CRC to each stream.
-    blockOutputStreamEntryPool.executePutBlockForAll();
+    ECBlockOutputStreamEntry streamEntry =
+        blockOutputStreamEntryPool.getCurrentStreamEntry();
+    streamEntry.executePutBlock();
     ecChunkBufferCache.clear(parityCellSize);
 
-    // check if block ends?
-    if (shouldEndBlockGroup()) {
-      blockOutputStreamEntryPool.endECBlock();
-      currentBlockGroupLen = 0;
+    if (streamEntry.getRemaining() <= 0) {
+      streamEntry.close();
+      if (allocateBlockIfFull) {
+        blockOutputStreamEntryPool.allocateBlockIfNeeded();
+      }
+    } else {
+      streamEntry.resetToFirstEntry();
     }
-  }
-
-  private boolean shouldEndBlockGroup() {
-    return currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
-        .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
-        .getLength();
+    currentBlockGroupLen = 0;
   }
 
   void writeParityCells(int parityCellSize) throws IOException {
@@ -257,10 +262,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
       buffers[i].flip();
     }
     encoder.encode(buffers, parityBuffers);
+    blockOutputStreamEntryPool
+        .getCurrentStreamEntry().forceToFirstParityBlock();
     for (int i =
          numDataBlks; i < (this.numDataBlks + this.numParityBlks); i++) {
       // Move the stream entry cursor to parity block index
-      blockOutputStreamEntryPool.setCurrIdx(i);
       handleParityWrite(i, parityBuffers[i - numDataBlks].array(), 0,
           ecChunkSize, true);
     }
@@ -272,8 +278,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
     handleOutputStreamWrite(currIdx, b, off, len, isFullCell, false);
 
     if(pos == ecChunkSize){
-      blockOutputStreamEntryPool
-          .updateToNextStream(numDataBlks + numParityBlks);
+      blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
     }
     return pos;
   }
@@ -281,22 +286,17 @@ public class ECKeyOutputStream extends KeyOutputStream {
   private void handleParityWrite(int currIdx, byte[] b, int off, long len,
       boolean isFullCell) throws IOException {
     handleOutputStreamWrite(currIdx, b, off, len, isFullCell, true);
-    blockOutputStreamEntryPool
-        .updateToNextStream(numDataBlks + numParityBlks);
+    blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
   }
 
   private void handleOutputStreamWrite(int currIdx, byte[] b, int off, long 
len,
       boolean isFullCell, boolean isParity) throws IOException {
 
     BlockOutputStreamEntry current =
-        blockOutputStreamEntryPool.allocateBlockIfNeeded();
+        blockOutputStreamEntryPool.getCurrentStreamEntry();
     int writeLengthToCurrStream =
         Math.min((int) len, (int) current.getRemaining());
     currentBlockGroupLen += isParity ? 0 : writeLengthToCurrStream;
-    if (current.getRemaining() <= 0) {
-      // since the current block is already written close the stream.
-      closeCurrentStream(StreamAction.CLOSE);
-    }
 
     len -= writeLengthToCurrStream;
     if (isFullCell) {
@@ -421,32 +421,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
     }
   }
 
-  private void closeCurrentStream(StreamAction op) throws IOException {
-    if (!blockOutputStreamEntryPool.isEmpty()) {
-      List<BlockOutputStreamEntry> allStreamEntries =
-          blockOutputStreamEntryPool.getStreamEntries();
-      for (int i = 0; i < allStreamEntries.size(); i++) {
-        while (true) {
-          try {
-            BlockOutputStreamEntry entry = allStreamEntries.get(i);
-            if (entry != null) {
-              try {
-                handleStreamAction(entry, op);
-              } catch (IOException ioe) {
-                handleException(entry, ioe);
-                continue;
-              }
-            }
-            return;
-          } catch (Exception e) {
-            markStreamClosed();
-            throw e;
-          }
-        }
-      }
-    }
-  }
-
   private void handleStreamAction(BlockOutputStreamEntry entry, StreamAction 
op)
       throws IOException {
     Collection<DatanodeDetails> failedServers = entry.getFailedServers();
@@ -484,11 +458,10 @@ public class ECKeyOutputStream extends KeyOutputStream {
     }
     closed = true;
     try {
-      handleFlushOrCloseAllStreams(StreamAction.CLOSE);
       if(isPartialStripe()){
         ByteBuffer bytesToWrite =
             ecChunkBufferCache.getDataBuffers()[blockOutputStreamEntryPool
-                .getCurrIdx()];
+                .getCurrentStreamEntry().getCurrentStreamIdx()];
 
         // Finish writing the current partial cached chunk
         if (bytesToWrite.position() % ecChunkSize != 0) {
@@ -509,16 +482,17 @@ public class ECKeyOutputStream extends KeyOutputStream {
         final int parityCellSize =
             lastStripeSize < ecChunkSize ? lastStripeSize : ecChunkSize;
         addPadding(parityCellSize);
-        handleParityWrites(parityCellSize);
+        handleParityWrites(parityCellSize, false);
       }
+      handleFlushOrCloseAllStreams(StreamAction.CLOSE);
 
       if (!isException) {
         Preconditions.checkArgument(writeOffset == offset);
       }
-      blockOutputStreamEntryPool.endECBlock();
+      blockOutputStreamEntryPool.getCurrentStreamEntry().close();
       blockOutputStreamEntryPool.commitKey(offset);
     } finally {
-      blockOutputStreamEntryPool.cleanupAll();
+      blockOutputStreamEntryPool.cleanup();
     }
     ecChunkBufferCache.release();
   }
@@ -552,10 +526,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
     return blockOutputStreamEntryPool.getCommitUploadPartInfo();
   }
 
-  public FileEncryptionInfo getFileEncryptionInfo() {
-    return feInfo;
-  }
-
   @VisibleForTesting
   public ExcludeList getExcludeList() {
     return blockOutputStreamEntryPool.getExcludeList();
@@ -685,7 +655,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
     private int addToDataBuffer(int i, byte[] b, int off, int len) {
       final ByteBuffer buf = dataBuffers[i];
       final int pos = buf.position() + len;
-      Preconditions.checkState(pos <= cellSize);
+      Preconditions.checkState(pos <= cellSize,
+          "Position("+pos+") is greater than the cellSize("+cellSize+").");
       buf.put(b, off, len);
       return pos;
     }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index d5f6f5d..4e0fbf9 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -150,7 +150,6 @@ public class KeyOutputStream extends OutputStream {
             unsafeByteBufferConversion,
             xceiverClientManager,
             handler.getId());
-
     this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
         config.getMaxRetryCount(), config.getRetryInterval());
     this.retryCount = 0;
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
index f5d2de3..2c315b7 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
@@ -61,6 +61,9 @@ public class MultiNodePipelineBlockAllocator implements 
MockBlockAllocator {
             .setIpAddress("1.2.3.4").addPorts(
                 HddsProtos.Port.newBuilder().setName("RATIS").setValue(1234 + 
i)
                     .build()).build());
+        if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
+          builder.addMemberReplicaIndexes(i);
+        }
       }
       if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
         builder.setEcReplicationConfig(keyArgs.getEcReplicationConfig());
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 1ba25f3..c66d0f6 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -66,6 +66,7 @@ public class TestOzoneECClient {
   private int chunkSize = 1024;
   private int dataBlocks = 3;
   private int parityBlocks = 2;
+  private int inputSize = chunkSize * dataBlocks;
   private OzoneClient client;
   private ObjectStore store;
   private String keyName = UUID.randomUUID().toString();
@@ -151,13 +152,13 @@ public class TestOzoneECClient {
   @Test
   public void testPutECKeyAndCheckParityData() throws IOException {
     OzoneBucket bucket = writeIntoECKey(inputChunks, keyName, null);
-    final ByteBuffer[] dataBuffers = new ByteBuffer[3];
+    final ByteBuffer[] dataBuffers = new ByteBuffer[dataBlocks];
     for (int i = 0; i < inputChunks.length; i++) {
       dataBuffers[i] = ByteBuffer.wrap(inputChunks[i]);
     }
     final ByteBuffer[] parityBuffers = new ByteBuffer[parityBlocks];
     for (int i = 0; i < parityBlocks; i++) {
-      parityBuffers[i] = ByteBuffer.allocate(1024);
+      parityBuffers[i] = ByteBuffer.allocate(chunkSize);
     }
     encoder.encode(dataBuffers, parityBuffers);
     OzoneKey key = bucket.getKey(keyName);
@@ -186,7 +187,7 @@ public class TestOzoneECClient {
     OzoneKey key = bucket.getKey(keyName);
     Assert.assertEquals(keyName, key.getName());
     try (OzoneInputStream is = bucket.readKey(keyName)) {
-      byte[] fileContent = new byte[1024];
+      byte[] fileContent = new byte[chunkSize];
       for (int i=0; i<dataBlocks; i++) {
         Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
         Assert.assertTrue(Arrays.equals(inputChunks[i], fileContent));
@@ -206,7 +207,7 @@ public class TestOzoneECClient {
 
     // create key without mentioning replication config. Since we set EC
     // replication in bucket, key should be EC key.
-    try (OzoneOutputStream out = bucket.createKey("mykey", 2000)) {
+    try (OzoneOutputStream out = bucket.createKey("mykey", inputSize)) {
       Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
       for (int i = 0; i < inputChunks.length; i++) {
         out.write(inputChunks[i]);
@@ -278,7 +279,7 @@ public class TestOzoneECClient {
   }
 
   @Test
-  public void testMultipleChunksWithPartialChunkInSigleWripeOp()
+  public void testMultipleChunksWithPartialChunkInSingleWriteOp()
       throws IOException {
     final int partialChunkLen = 10;
     final int numFullChunks = 9;
@@ -310,7 +311,7 @@ public class TestOzoneECClient {
 
     // create key without mentioning replication config. Since we set EC
     // replication in bucket, key should be EC key.
-    try (OzoneOutputStream out = bucket.createKey("mykey", 1024)) {
+    try (OzoneOutputStream out = bucket.createKey("mykey", 6*inputSize)) {
       Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
       // Block Size is 2kb, so to create 3 blocks we need 6 iterations here
       for (int j = 0; j < 6; j++) {
@@ -348,7 +349,7 @@ public class TestOzoneECClient {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+    try (OzoneOutputStream out = bucket.createKey(keyName, inputSize,
         new ECReplicationConfig(dataBlocks, parityBlocks,
             ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
       for (int i = 0; i < inputChunks[0].length; i++) {
@@ -367,7 +368,7 @@ public class TestOzoneECClient {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+    try (OzoneOutputStream out = bucket.createKey(keyName, inputSize,
         new ECReplicationConfig(dataBlocks, parityBlocks,
             ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
       for (int i = 0; i < inputChunks[0].length-1; i++) {
@@ -392,7 +393,8 @@ public class TestOzoneECClient {
         Arrays.copyOf(inputChunks[inputChunks.length - 1],
             inputChunks[inputChunks.length - 1].length - 1);
 
-    try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+    int inSize = chunkSize * (inputChunks.length - 1) + lastChunk.length;
+    try (OzoneOutputStream out = bucket.createKey(keyName, inSize,
         new ECReplicationConfig(dataBlocks, parityBlocks,
             ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
       for (int i = 0; i < inputChunks.length - 1; i++) {
@@ -405,7 +407,7 @@ public class TestOzoneECClient {
     }
 
     try (OzoneInputStream is = bucket.readKey(keyName)) {
-      byte[] fileContent = new byte[1024];
+      byte[] fileContent = new byte[chunkSize];
       for (int i=0; i<2; i++) {
         Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
         Assert.assertTrue(Arrays.equals(inputChunks[i], fileContent));
@@ -436,9 +438,10 @@ public class TestOzoneECClient {
     }
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    try (OzoneOutputStream out = bucket.createKey(key, 4096,
-        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
-            chunkSize), new HashMap<>())) {
+    int size = (int) Arrays.stream(chunks).mapToLong(a -> a.length).sum();
+    try (OzoneOutputStream out = bucket.createKey(key, size,
+        new ECReplicationConfig(dataBlocks, parityBlocks,
+            ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
       for (int i = 0; i < chunks.length; i++) {
         out.write(chunks[i]);
       }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
new file mode 100644
index 0000000..4b87ef7
--- /dev/null
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * {@link ECBlockOutputStreamEntry} tests.
+ */
+public class TestECBlockOutputStreamEntry {
+
+  @Test
+  public void
+      testAcquireDifferentClientForECBlocksOnTheSameHostButDifferentPort()
+      throws IOException {
+    PipelineID randomId = PipelineID.randomId();
+    ReplicationConfig ecReplicationConfig =
+        new ECReplicationConfig("RS-3-2-1024k");
+    DatanodeDetails node1 = aNode("127.0.0.1", "localhost", 2001);
+    DatanodeDetails node2 = aNode("127.0.0.1", "localhost", 2002);
+    DatanodeDetails node3 = aNode("127.0.0.1", "localhost", 2003);
+    DatanodeDetails node4 = aNode("127.0.0.1", "localhost", 2004);
+    DatanodeDetails node5 = aNode("127.0.0.1", "localhost", 2005);
+    List<DatanodeDetails> nodes =
+        Arrays.asList(node1, node2, node3, node4, node5);
+    Pipeline anECPipeline = Pipeline.newBuilder()
+        .setId(randomId)
+        .setReplicationConfig(ecReplicationConfig)
+        .setState(Pipeline.PipelineState.OPEN)
+        .setNodes(nodes)
+        .build();
+    XceiverClientManager manager =
+        new XceiverClientManager(new OzoneConfiguration());
+    HashSet<XceiverClientSpi> clients = new HashSet<>();
+    ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder()
+        .setXceiverClientManager(manager)
+        .setPipeline(anECPipeline)
+        .build();
+    for (int i = 0; i < nodes.size(); i++) {
+      clients.add(
+          manager.acquireClient(
+              entry.createSingleECBlockPipeline(
+                  anECPipeline, nodes.get(i), i
+              )));
+    }
+    assertEquals(5, clients.size());
+  }
+
+  @Test
+  public void
+      
testAcquireDifferentClientForECBlocksOnTheSameHostWithSomeOnSamePortAlso()
+      throws IOException {
+    PipelineID randomId = PipelineID.randomId();
+    ReplicationConfig ecReplicationConfig =
+        new ECReplicationConfig("RS-3-2-1024k");
+    DatanodeDetails node1 = aNode("127.0.0.1", "localhost", 2001);
+    DatanodeDetails node2 = aNode("127.0.0.1", "localhost", 2001);
+    DatanodeDetails node3 = aNode("127.0.0.1", "localhost", 2003);
+    DatanodeDetails node4 = aNode("127.0.0.1", "localhost", 2001);
+    DatanodeDetails node5 = aNode("127.0.0.1", "localhost", 2005);
+    List<DatanodeDetails> nodes =
+        Arrays.asList(node1, node2, node3, node4, node5);
+    Pipeline anECPipeline = Pipeline.newBuilder()
+        .setId(randomId)
+        .setReplicationConfig(ecReplicationConfig)
+        .setState(Pipeline.PipelineState.OPEN)
+        .setNodes(nodes)
+        .build();
+    XceiverClientManager manager =
+        new XceiverClientManager(new OzoneConfiguration());
+    HashSet<XceiverClientSpi> clients = new HashSet<>();
+    ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder()
+        .setXceiverClientManager(manager)
+        .setPipeline(anECPipeline)
+        .build();
+    for (int i = 0; i < nodes.size(); i++) {
+      clients.add(
+          manager.acquireClient(
+              entry.createSingleECBlockPipeline(
+                  anECPipeline, nodes.get(i), i
+              )));
+    }
+    assertEquals(3, clients.size());
+    assertEquals(1, clients.stream().filter(c -> c.getRefcount() == 
3).count());
+    assertEquals(2, clients.stream().filter(c -> c.getRefcount() == 
1).count());
+  }
+
+  private DatanodeDetails aNode(String ip, String hostName, int port) {
+    return DatanodeDetails.newBuilder()
+        .setUuid(UUID.randomUUID())
+        .setIpAddress(ip)
+        .setHostName(hostName)
+        .addPort(
+            DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, 
port))
+        .build();
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 84ccd44..6f12b62 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -16,7 +16,6 @@
  */
 package org.apache.hadoop.ozone.client.rpc;
 
-import com.google.common.cache.Cache;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -25,9 +24,6 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.BucketArgs;
@@ -37,7 +33,6 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
@@ -49,12 +44,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -79,6 +70,7 @@ public class TestECKeyOutputStream {
   private static String keyString;
   private static int dataBlocks = 3;
   private static int parityBlocks = 2;
+  private static int inputSize = dataBlocks * chunkSize;
   private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
 
   /**
@@ -132,8 +124,8 @@ public class TestECKeyOutputStream {
   public void testCreateKeyWithECReplicationConfig() throws Exception {
     try (OzoneOutputStream key = TestHelper
         .createKey(keyString, new ECReplicationConfig(3, 2,
-            ECReplicationConfig.EcCodec.RS, chunkSize), 2000, objectStore,
-            volumeName, bucketName)) {
+              ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
+            objectStore, volumeName, bucketName)) {
       Assert.assertTrue(key.getOutputStream() instanceof ECKeyOutputStream);
     }
   }
@@ -142,7 +134,7 @@ public class TestECKeyOutputStream {
   public void testCreateKeyWithOutBucketDefaults() throws Exception {
     OzoneVolume volume = objectStore.getVolume(volumeName);
     OzoneBucket bucket = volume.getBucket(bucketName);
-    try (OzoneOutputStream out = bucket.createKey("myKey", 2000)) {
+    try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
       Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
       for (int i = 0; i < inputChunks.length; i++) {
         out.write(inputChunks[i]);
@@ -152,8 +144,18 @@ public class TestECKeyOutputStream {
 
   @Test
   public void testCreateKeyWithBucketDefaults() throws Exception {
-    OzoneBucket bucket = getOzoneBucket();
-    try (OzoneOutputStream out = bucket.createKey(keyString, 2000)) {
+    String myBucket = UUID.randomUUID().toString();
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+    bucketArgs.setDefaultReplicationConfig(
+        new DefaultReplicationConfig(ReplicationType.EC,
+            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+                chunkSize)));
+
+    volume.createBucket(myBucket, bucketArgs.build());
+    OzoneBucket bucket = volume.getBucket(myBucket);
+
+    try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
       Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
       for (int i = 0; i < inputChunks.length; i++) {
         out.write(inputChunks[i]);
@@ -244,64 +246,6 @@ public class TestECKeyOutputStream {
     return volume.getBucket(myBucket);
   }
 
-
-
-  @Test
-  public void testECKeyXceiverClientShouldNotUseCachedKeysForDifferentStreams()
-      throws Exception {
-    int data = 3;
-    int parity = 2;
-    try (OzoneOutputStream key = TestHelper
-        .createKey(keyString, new ECReplicationConfig(data, parity,
-            ECReplicationConfig.EcCodec.RS, chunkSize), 1024,
-            objectStore, volumeName, bucketName)) {
-      final List<BlockOutputStreamEntry> streamEntries =
-          ((ECKeyOutputStream) key.getOutputStream()).getStreamEntries();
-      Assert.assertEquals(data + parity, streamEntries.size());
-      final Cache<String, XceiverClientSpi> clientCache =
-          ((XceiverClientManager) ((ECKeyOutputStream) key.getOutputStream())
-              .getXceiverClientFactory()).getClientCache();
-      clientCache.invalidateAll();
-      clientCache.cleanUp();
-      final Pipeline firstStreamPipeline = streamEntries.get(0).getPipeline();
-      XceiverClientSpi xceiverClientSpi =
-          ((ECKeyOutputStream) key.getOutputStream()).getXceiverClientFactory()
-              .acquireClient(firstStreamPipeline);
-      Assert.assertNotNull(xceiverClientSpi);
-      final String firstCacheKey =
-          clientCache.asMap().entrySet().iterator().next().getKey();
-      List<String> prevVisitedKeys = new ArrayList<>();
-      prevVisitedKeys.add(firstCacheKey);
-      // Lets look at all underlying EC Block group streams and make sure
-      // xceiver client entry is not repeating for all.
-      for (int i = 1; i < streamEntries.size(); i++) {
-        Pipeline pipeline = streamEntries.get(i).getPipeline();
-        Assert.assertEquals(i, clientCache.asMap().size());
-        xceiverClientSpi = ((ECKeyOutputStream) key.getOutputStream())
-            .getXceiverClientFactory().acquireClient(pipeline);
-        Assert.assertNotNull(xceiverClientSpi);
-        Assert.assertEquals(i + 1, clientCache.asMap().size());
-        final String newCacheKey =
-            getNewKey(clientCache.asMap().entrySet().iterator(),
-                prevVisitedKeys);
-        prevVisitedKeys.add(newCacheKey);
-        Assert.assertNotEquals(firstCacheKey, newCacheKey);
-      }
-    }
-  }
-
-  private String getNewKey(
-      Iterator<Map.Entry<String, XceiverClientSpi>> iterator,
-      List<String> prevVisitedKeys) {
-    while (iterator.hasNext()) {
-      final String key = iterator.next().getKey();
-      if (!prevVisitedKeys.contains(key)) {
-        return key;
-      }
-    }
-    return null;
-  }
-
   private static void initInputChunks() {
     for (int i = 0; i < dataBlocks; i++) {
       inputChunks[i] = getBytesWith(i + 1, chunkSize);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to