fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722115875
##########
File path:
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
*/
package org.apache.hadoop.ozone.client.io;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Helper for {@link ECBlockOutputStream}.
*/
public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
- private final boolean isParityStreamEntry;
- private ECBlockOutputStream out;
+ private ECBlockOutputStream[] blockOutputStreams;
+ private final ECReplicationConfig replicationConfig;
+ private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+ private int currentStreamIdx = 0;
@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long
length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config, boolean isParityStream) {
+ OzoneClientConfig config) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
token, config);
- this.isParityStreamEntry = isParityStream;
+ Preconditions.assertInstanceOf(
+ pipeline.getReplicationConfig(), ECReplicationConfig.class);
+ this.replicationConfig =
+ (ECReplicationConfig) pipeline.getReplicationConfig();
+ }
+
+ @Override
+ void createOutputStream() throws IOException {
+ Pipeline ecPipeline = getPipeline();
+ List<DatanodeDetails> nodes = getPipeline().getNodes();
+ blockOutputStreams =
+ new ECBlockOutputStream[nodes.size()];
+ for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+ blockOutputStreams[i] = new ECBlockOutputStream(
+ getBlockID(),
+ getXceiverClientManager(),
+ createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+ getBufferPool(),
+ getConf(),
+ getToken());
+ }
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ if (!isInitialized()) {
+ return null;
+ }
+ return blockOutputStreams[currentStreamIdx];
+ }
+
+ @Override
+ boolean isInitialized() {
+ return blockOutputStreams != null;
+ }
+
+ public int getCurrentStreamIdx() {
+ return currentStreamIdx;
+ }
+
+ public void useNextBlockStream() {
+ currentStreamIdx++;
+ }
+
+ public void forceToFirstParityBlock(){
+ currentStreamIdx = replicationConfig.getData();
+ }
+
+ @Override
+ void incCurrentPosition() {
+ if (isWritingParity()) {
+ return;
+ }
+ super.incCurrentPosition();
+ }
+
+ @Override
+ void incCurrentPosition(long len) {
+ if (isWritingParity()){
+ return;
+ }
+ super.incCurrentPosition(len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!isInitialized()) {
+ return;
+ }
+ for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+ blockOutputStreams[i].flush();
+ }
+ }
+
+ @Override
+ boolean isClosed() {
+ if (!isInitialized()) {
+ return false;
+ }
+ return blockStreams().allMatch(BlockOutputStream::isClosed);
}
@Override
- ECBlockOutputStream createOutputStream() throws IOException {
- this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
- getPipeline(), getBufferPool(), getConf(), getToken());
- return this.out;
+ public void close() throws IOException {
+ if (!isInitialized()) {
+ return;
+ }
+ for (ECBlockOutputStream stream : blockOutputStreams) {
+ stream.close();
+ }
+ updateBlockID(underlyingBlockID());
+ }
+
+ @Override
+ long getTotalAckDataLength() {
+ if (!isInitialized()) {
+ return 0;
+ }
+ // blockID is the same for EC blocks inside one block group managed by
+ // this entry.
+ updateBlockID(underlyingBlockID());
+ //TODO: A future implementation might require something like this, but
+ // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+ // this method returns 0 all the time from the unrelying streams.
+ // After we have a confirmed ack mechanism, like there is in
+ // RatisBlockOutputStream, we should revisit this part, and decide if we
+ // want to filter out parity here for example.
+// return blockStreams()
+// .mapToLong(BlockOutputStream::getTotalAckDataLength)
+// .sum();
Review comment:
ECBlockOutputStream extends and extended BlockOutputStream before.
Earlier ECBlockOutputStream did not override this method, so the method in
BlockOutputStream was called. That method returned 0 hardwired.
RatisBlockOutputStream on the other hand maintains the acked data length via
CommitWatcher, and returns that.
Our EC logic does not hold or check for any ack from the DNs via the grpc
client, and does not maintain the acked legth so far, and it seems to be safer
for now to just return 0 instead of calling the method of underlying streams,
as we may not do this via those streams later on. I have added the naive
implementation for discussion. I am unsure if the acked length should account
for the parity block acks or for just the data block acks either.
And in any case, even if we sum the results, we will get 0 anyways, but it
is easier to spot this way that the logic is not yet developed to calculate the
acked length. Did I miss something?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]