fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722111946



##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long 
length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }

Review comment:
       Initialization became a two stage process here, In the base 
BlockOutputStreamEntry class, there is the checkStream() method, which is the 
only place we are calling createOutputStream(). The createOutputStream method 
in one go initializes data+parity ECBlockOutputStreams, and places them into 
the blockOutputStreams array.
   So that this class is initialized, if we have any non-null element in the 
blockOutputStreams array, therefore isInitialized checks if the first element 
of that array is non-null, and returns true if it is non-null.
   
   The ECBlockOutputStreams themselves are cheap to create, and unless anything 
is written to one, they do not initiate any communication towards the 
DataNodes, nor do anything at close or flush.
   With that I believe, we are fine initializing all the streams at once at 
createStream, and throw away some at the end without being used if we do not 
need it, this might be an optimization later, but I am unsure yet if we need 
that gain those microseconds at the time when we start writing after a block 
allocation, while we can spare the complexity it brings in if we instantiate 
the streams on demand.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to