fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722104338
##########
File path:
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
*/
package org.apache.hadoop.ozone.client.io;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Helper for {@link ECBlockOutputStream}.
*/
public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
- private final boolean isParityStreamEntry;
- private ECBlockOutputStream out;
+ private ECBlockOutputStream[] blockOutputStreams;
+ private final ECReplicationConfig replicationConfig;
+ private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+ private int currentStreamIdx = 0;
@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long
length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config, boolean isParityStream) {
+ OzoneClientConfig config) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
token, config);
- this.isParityStreamEntry = isParityStream;
+ Preconditions.assertInstanceOf(
+ pipeline.getReplicationConfig(), ECReplicationConfig.class);
+ this.replicationConfig =
+ (ECReplicationConfig) pipeline.getReplicationConfig();
+ }
+
+ @Override
+ void createOutputStream() throws IOException {
+ Pipeline ecPipeline = getPipeline();
Review comment:
ECBlockOutputStreamEntry will manage the individual streams. In order
for it to be able to create the streams and also to handle preallocations we
need to have the full EC Pipeline here, as that will provide us all the
necessary data to create the internal single replica pipelines (with that the
BlockOutputStreams) to write the data to, and to report the key location info
back to OM at the end of the write during commitKey.
I think there are 1 change is necessary here, instread of getPipeline() in
the next line I should have use the ecPipeline to reference the full pipeline,
making it obvious where the nodes are coming from, and to differentiate from
the single replica pipelines created by the createSingleECBlockPipeline()
method which provides them to the internal block output streams 8 lines below
from here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]