This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 38e4cca HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to
accommodate all block group related ECBlockOuputStreams. (#2702)
38e4cca is described below
commit 38e4cca27367bd43631211402b9c203784fa18fa
Author: Istvan Fajth <[email protected]>
AuthorDate: Thu Oct 14 07:39:47 2021 +0200
HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block
group related ECBlockOuputStreams. (#2702)
---
.../hdds/scm/storage/ECBlockOutputStream.java | 6 +
.../ozone/client/io/BlockOutputStreamEntry.java | 17 +-
.../client/io/BlockOutputStreamEntryPool.java | 18 +-
.../ozone/client/io/ECBlockOutputStreamEntry.java | 265 +++++++++++++++++++--
.../client/io/ECBlockOutputStreamEntryPool.java | 201 ++--------------
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 127 ++++------
.../hadoop/ozone/client/io/KeyOutputStream.java | 1 -
.../client/MultiNodePipelineBlockAllocator.java | 3 +
.../hadoop/ozone/client/TestOzoneECClient.java | 29 ++-
.../client/io/TestECBlockOutputStreamEntry.java | 128 ++++++++++
.../ozone/client/rpc/TestECKeyOutputStream.java | 88 ++-----
11 files changed, 492 insertions(+), 391 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 30bd2b7..86d7058 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -115,4 +115,10 @@ public class ECBlockOutputStream extends BlockOutputStream{
}
return flushFuture;
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ cleanup(false);
+ }
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index bb821b2..1fd89f6 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -42,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting;
*
* The base implementation is handling Ratis-3 writes, with a single stream,
* but there can be other implementations that are using a different way.
- * */
+ */
public class BlockOutputStreamEntry extends OutputStream {
private final OzoneClientConfig config;
@@ -276,7 +276,8 @@ public class BlockOutputStreamEntry extends OutputStream {
* here.
* @param id the last know ID of the block.
*/
- void updateBlockID(BlockID id) {
+ @VisibleForTesting
+ protected void updateBlockID(BlockID id) {
this.blockID = id;
}
@@ -297,18 +298,6 @@ public class BlockOutputStreamEntry extends OutputStream {
return this.pipeline;
}
- /**
- * Gets the Pipeline based on which the location report can be sent to the
OM.
- * This is necessary, as implementors might use special pipeline information
- * that can be created during commit, but not during initialization,
- * and might need to update some Pipeline information returned in
- * OMKeyLocationInfo.
- * @return
- */
- Pipeline getPipelineForOMLocationReport(){
- return getPipeline();
- }
-
long getCurrentPosition() {
return this.currentPosition;
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 9c4414f..912ba3a 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -184,7 +184,7 @@ public class BlockOutputStreamEntryPool {
.build();
}
- void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
Preconditions.checkNotNull(subKeyInfo.getPipeline());
streamEntries.add(createStreamEntry(subKeyInfo));
}
@@ -203,7 +203,7 @@ public class BlockOutputStreamEntryPool {
return locationInfoList;
}
- List<OmKeyLocationInfo> getOmKeyLocationInfos(
+ private List<OmKeyLocationInfo> getOmKeyLocationInfos(
List<BlockOutputStreamEntry> streams) {
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
for (BlockOutputStreamEntry streamEntry : streams) {
@@ -217,7 +217,7 @@ public class BlockOutputStreamEntryPool {
.setLength(streamEntry.getCurrentPosition())
.setOffset(0)
.setToken(streamEntry.getToken())
- .setPipeline(streamEntry.getPipelineForOMLocationReport())
+ .setPipeline(streamEntry.getPipeline())
.build();
locationInfoList.add(info);
}
@@ -345,18 +345,6 @@ public class BlockOutputStreamEntryPool {
}
}
- public int getCurrIdx(){
- return currentStreamIndex;
- }
-
- public void setCurrIdx(int currIdx) {
- this.currentStreamIndex = currIdx;
- }
-
- public void updateToNextStream(int rotation) {
- currentStreamIndex = (currentStreamIndex + 1) % rotation;
- }
-
/**
* Allocates a new block with OM if the current stream is closed, and new
* writes are to be handled.
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index df04204..22a1906 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -17,56 +17,286 @@
*/
package org.apache.hadoop.ozone.client.io;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
/**
- * Helper for {@link ECBlockOutputStream}.
+ * ECBlockOutputStreamEntry manages write into EC keys' data block groups.
+ * A block group consists of data and parity blocks. For every block we have
+ * an internal ECBlockOutputStream instance with a single node pipeline, that
+ * is derived from the original EC pipeline.
*/
public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
- private final boolean isParityStreamEntry;
- private ECBlockOutputStream out;
+ private final ECReplicationConfig replicationConfig;
+ private final long length;
+
+ private ECBlockOutputStream[] blockOutputStreams;
+ private int currentStreamIdx = 0;
+
@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long
length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config, boolean isParityStream) {
+ OzoneClientConfig config) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
token, config);
- this.isParityStreamEntry = isParityStream;
+ assertInstanceOf(
+ pipeline.getReplicationConfig(), ECReplicationConfig.class);
+ this.replicationConfig =
+ (ECReplicationConfig) pipeline.getReplicationConfig();
+ this.length = replicationConfig.getData() * length;
+ }
+
+ @Override
+ void checkStream() throws IOException {
+ if (!isInitialized()) {
+ blockOutputStreams =
+ new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+ }
+ if (blockOutputStreams[currentStreamIdx] == null) {
+ createOutputStream();
+ }
}
@Override
void createOutputStream() throws IOException {
- this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
- getPipeline(), getBufferPool(), getConf(), getToken());
+ Pipeline ecPipeline = getPipeline();
+ List<DatanodeDetails> nodes = getPipeline().getNodes();
+ blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+ getBlockID(),
+ getXceiverClientManager(),
+ createSingleECBlockPipeline(
+ ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+ getBufferPool(),
+ getConf(),
+ getToken());
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ if (!isInitialized()) {
+ return null;
+ }
+ checkState(blockOutputStreams[currentStreamIdx] != null);
+ return blockOutputStreams[currentStreamIdx];
+ }
+
+ @Override
+ boolean isInitialized() {
+ return blockOutputStreams != null;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ public int getCurrentStreamIdx() {
+ return currentStreamIdx;
+ }
+
+ public void useNextBlockStream() {
+ currentStreamIdx++;
+ }
+
+ public void forceToFirstParityBlock(){
+ currentStreamIdx = replicationConfig.getData();
}
- public ECBlockOutputStream getOutputStream() {
- return out;
+ public void resetToFirstEntry(){
+ currentStreamIdx = 0;
+ }
+
+ @Override
+ void incCurrentPosition() {
+ if (isWritingParity()) {
+ return;
+ }
+ super.incCurrentPosition();
+ }
+
+ @Override
+ void incCurrentPosition(long len) {
+ if (isWritingParity()){
+ return;
+ }
+ super.incCurrentPosition(len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!isInitialized()) {
+ return;
+ }
+ for (int i = 0;
+ i <= currentStreamIdx && i < blockOutputStreams.length; i++) {
+ if (blockOutputStreams[i] != null) {
+ blockOutputStreams[i].flush();
+ }
+ }
+ }
+
+ @Override
+ boolean isClosed() {
+ if (!isInitialized()) {
+ return false;
+ }
+ return blockStreams().allMatch(BlockOutputStream::isClosed);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!isInitialized()) {
+ return;
+ }
+ for (ECBlockOutputStream stream : blockOutputStreams) {
+ if (stream != null) {
+ stream.close();
+ }
+ }
+ updateBlockID(underlyingBlockID());
+ }
+
+ @Override
+ long getTotalAckDataLength() {
+ if (!isInitialized()) {
+ return 0;
+ }
+ updateBlockID(underlyingBlockID());
+ // Returning zero here. Underlying streams in EC entry are
+ // ECBlockOutputStreams, extending from BlockOutputStream, without
+ // overriding getTotalAckDataLength, and default implementation returns
+ // constant zero, so even summarizing the return value of this method
+ // from blockStreams entries would yield to 0. Once this changes, we need
+ // to revisit this, and implement a proper sum of data or all streams.
+ return 0;
+ }
+
+ /**
+ * Returns the amount of bytes that were attempted to be sent through towards
+ * the DataNodes, and the write call succeeded without an exception.
+ * In EC entries the parity writes does not count into this, as the written
+ * data length represents the attempts of the classes using the entry, and
+ * not the attempts of the entry itself.
+ * @return 0 if the stream is not initialized, the amount of data bytes that
+ * were attempted to be written to the entry.
+ */
+ //TODO: this might become problematic, and should be tested during the
+ // implementation of retries and error handling, as if there is a retry,
+ // then some data might have to be written twice.
+ // This current implementation is an assumption here.
+ // We might need to account the parity bytes written here, or elsewhere.
+ @Override
+ long getWrittenDataLength() {
+ if (!isInitialized()) {
+ return 0;
+ }
+ return dataStreams()
+ .mapToLong(BlockOutputStream::getWrittenDataLength)
+ .sum();
+ }
+
+ @Override
+ Collection<DatanodeDetails> getFailedServers() {
+ if (!isInitialized()) {
+ return Collections.emptyList();
+ }
+
+ return blockStreams()
+ .flatMap(outputStream -> outputStream.getFailedServers().stream())
+ .collect(Collectors.toList());
+ }
+
+ @VisibleForTesting
+ Pipeline createSingleECBlockPipeline(Pipeline ecPipeline,
+ DatanodeDetails node, int replicaIndex) {
+ Map<DatanodeDetails, Integer> indiciesForSinglePipeline = new HashMap<>();
+ indiciesForSinglePipeline.put(node, replicaIndex);
+ return Pipeline.newBuilder()
+ .setId(ecPipeline.getId())
+ .setReplicationConfig(ecPipeline.getReplicationConfig())
+ .setState(ecPipeline.getPipelineState())
+ .setNodes(ImmutableList.of(node))
+ .setReplicaIndexes(indiciesForSinglePipeline)
+ .build();
}
void executePutBlock() throws IOException {
- this.out.executePutBlock(false, true);
+ if (!isInitialized()) {
+ return;
+ }
+ int failedStreams = 0;
+ for (ECBlockOutputStream stream : blockOutputStreams) {
+ if (stream == null) {
+ continue;
+ }
+ if (!stream.isClosed()) {
+ stream.executePutBlock(false, true);
+ } else {
+ failedStreams++;
+ }
+ if(failedStreams > replicationConfig.getParity()) {
+ throw new IOException(
+ "There are " + failedStreams + " block write failures,"
+ + " supported tolerance: " + replicationConfig.getParity());
+ }
+ }
+ }
+
+ private BlockID underlyingBlockID() {
+ if (blockOutputStreams[0] == null) {
+ return null;
+ }
+ // blockID is the same for EC blocks inside one block group managed by
+ // this entry, so updating based on the first stream, as when we write any
+ // data that is surely exists.
+ return blockOutputStreams[0].getBlockID();
+ }
+
+ private boolean isWritingParity() {
+ return currentStreamIdx >= replicationConfig.getData();
+ }
+
+ private Stream<ECBlockOutputStream> blockStreams() {
+ return Arrays.stream(blockOutputStreams).filter(Objects::nonNull);
}
- public boolean isParityStreamEntry() {
- return this.isParityStreamEntry;
+ private Stream<ECBlockOutputStream> dataStreams() {
+ return Arrays.stream(blockOutputStreams)
+ .limit(replicationConfig.getData())
+ .filter(Objects::nonNull);
}
/**
* Builder class for ChunkGroupOutputStreamEntry.
* */
public static class Builder {
-
private BlockID blockID;
private String key;
private XceiverClientFactory xceiverClientManager;
@@ -75,7 +305,6 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
private OzoneClientConfig config;
- private boolean isParityStreamEntry;
public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
this.blockID = bID;
@@ -123,12 +352,6 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
return this;
}
- public ECBlockOutputStreamEntry.Builder setIsParityStreamEntry(
- boolean isParity) {
- this.isParityStreamEntry = isParity;
- return this;
- }
-
public ECBlockOutputStreamEntry build() {
return new ECBlockOutputStreamEntry(blockID,
key,
@@ -136,7 +359,7 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
pipeline,
length,
bufferPool,
- token, config, isParityStreamEntry);
+ token, config);
}
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index aa27a72..e379956 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -17,33 +17,27 @@
*/
package org.apache.hadoop.ozone.client.io;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
/**
- * This class manages the stream entries list and handles block allocation
- * from OzoneManager for EC writes.
+ * {@link BlockOutputStreamEntryPool} is responsible to manage OM communication
+ * regarding writing a block to Ozone in a non-EC write case.
+ * The basic operations are fine for us but we need a specific
+ * {@link ECBlockOutputStreamEntry} implementation to handle writing EC block
+ * groups, this class implements the logic that handles the specific EC
entries'
+ * instantiation and retrieval from the pool.
+ *
+ * @see ECKeyOutputStream
+ * @see BlockOutputStreamEntryPool
+ * @see ECBlockOutputStreamEntry
*/
public class ECBlockOutputStreamEntryPool extends BlockOutputStreamEntryPool {
- private final List<BlockOutputStreamEntry> finishedStreamEntries;
- private final ECReplicationConfig ecReplicationConfig;
@SuppressWarnings({"parameternumber", "squid:S00107"})
public ECBlockOutputStreamEntryPool(OzoneClientConfig config,
@@ -60,173 +54,26 @@ public class ECBlockOutputStreamEntryPool extends
BlockOutputStreamEntryPool {
super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
openID);
- this.finishedStreamEntries = new ArrayList<>();
assert replicationConfig instanceof ECReplicationConfig;
- this.ecReplicationConfig = (ECReplicationConfig) replicationConfig;
}
@Override
- void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
- Preconditions.checkNotNull(subKeyInfo.getPipeline());
- List<DatanodeDetails> nodes = subKeyInfo.getPipeline().getNodes();
- for (int i = 0; i < nodes.size(); i++) {
- List<DatanodeDetails> nodeStatus = new ArrayList<>();
- nodeStatus.add(nodes.get(i));
- Map<DatanodeDetails, Integer> nodeVsIdx = new HashMap<>();
- nodeVsIdx.put(nodes.get(i), i + 1);
- Pipeline pipeline =
- Pipeline.newBuilder().setId(subKeyInfo.getPipeline().getId())
- .setReplicationConfig(
- subKeyInfo.getPipeline().getReplicationConfig())
- .setState(subKeyInfo.getPipeline().getPipelineState())
- .setNodes(nodeStatus).setReplicaIndexes(nodeVsIdx).build();
-
- ECBlockOutputStreamEntry.Builder builder =
- new ECBlockOutputStreamEntry.Builder()
- .setBlockID(subKeyInfo.getBlockID()).setKey(getKeyName())
- .setXceiverClientManager(getXceiverClientFactory())
- .setPipeline(pipeline).setConfig(getConfig())
- .setLength(subKeyInfo.getLength()).setBufferPool(getBufferPool())
- .setToken(subKeyInfo.getToken())
- .setIsParityStreamEntry(i >= ecReplicationConfig.getData());
- getStreamEntries().add(builder.build());
- }
- }
-
- public List<OmKeyLocationInfo> getLocationInfoList() {
- List<OmKeyLocationInfo> locationInfoList;
- List<OmKeyLocationInfo> currBlocksLocationInfoList =
- getOmKeyLocationInfos(getStreamEntries());
- List<OmKeyLocationInfo> prevBlksKeyLocationInfos =
- getOmKeyLocationInfos(finishedStreamEntries);
- prevBlksKeyLocationInfos.addAll(currBlocksLocationInfoList);
- locationInfoList = prevBlksKeyLocationInfos;
- return locationInfoList;
+ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
+ return
+ new ECBlockOutputStreamEntry.Builder()
+ .setBlockID(subKeyInfo.getBlockID())
+ .setKey(getKeyName())
+ .setXceiverClientManager(getXceiverClientFactory())
+ .setPipeline(subKeyInfo.getPipeline())
+ .setConfig(getConfig())
+ .setLength(subKeyInfo.getLength())
+ .setBufferPool(getBufferPool())
+ .setToken(subKeyInfo.getToken())
+ .build();
}
@Override
- long getKeyLength() {
- long totalLength = getStreamEntries().stream()
- .filter(c -> !((ECBlockOutputStreamEntry) c).isParityStreamEntry())
- .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
-
- totalLength += finishedStreamEntries.stream()
- .filter(c -> !((ECBlockOutputStreamEntry) c).isParityStreamEntry())
- .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
- return totalLength;
+ public ECBlockOutputStreamEntry getCurrentStreamEntry() {
+ return (ECBlockOutputStreamEntry) super.getCurrentStreamEntry();
}
-
- @Override
- List<OmKeyLocationInfo> getOmKeyLocationInfos(
- List<BlockOutputStreamEntry> streams) {
- List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
- Map<BlockID, ArrayList<ECBlockOutputStreamEntry>> blkIdVsStream =
- new LinkedHashMap<>();
-
- for (BlockOutputStreamEntry streamEntry : streams) {
- BlockID blkID = streamEntry.getBlockID();
- final ArrayList<ECBlockOutputStreamEntry> stream =
- blkIdVsStream.getOrDefault(blkID, new ArrayList<>());
- stream.add((ECBlockOutputStreamEntry) streamEntry);
- blkIdVsStream.put(blkID, stream);
- }
-
- final Iterator<Map.Entry<BlockID, ArrayList<ECBlockOutputStreamEntry>>>
- iterator = blkIdVsStream.entrySet().iterator();
-
- while (iterator.hasNext()) {
- final Map.Entry<BlockID, ArrayList<ECBlockOutputStreamEntry>>
- blkGrpIDVsStreams = iterator.next();
- final ArrayList<ECBlockOutputStreamEntry> blkGrpStreams =
- blkGrpIDVsStreams.getValue();
- List<DatanodeDetails> nodeStatus = new ArrayList<>();
- Map<DatanodeDetails, Integer> nodeVsIdx = new HashMap<>();
-
- // Assumption: Irrespective of failures, stream entries must have updated
- // the lengths.
- long blkGRpLen = 0;
- for (ECBlockOutputStreamEntry internalBlkStream : blkGrpStreams) {
- blkGRpLen += !(internalBlkStream).isParityStreamEntry() ?
- internalBlkStream.getCurrentPosition() :
- 0;
- // In EC, only one node per internal block stream.
- final DatanodeDetails nodeDetails =
- internalBlkStream.getPipeline().getNodeSet().iterator().next();
- nodeStatus.add(nodeDetails);
- nodeVsIdx.put(nodeDetails,
- internalBlkStream.getPipeline().getReplicaIndex(nodeDetails));
- }
- nodeStatus.sort((o1, o2) -> nodeVsIdx.get(o1) - nodeVsIdx.get(o2));
- final BlockOutputStreamEntry firstStreamInBlockGrp =
blkGrpStreams.get(0);
- Pipeline blockGrpPipeline = Pipeline.newBuilder()
- .setId(firstStreamInBlockGrp.getPipeline().getId())
- .setReplicationConfig(
- firstStreamInBlockGrp.getPipeline().getReplicationConfig())
- .setState(firstStreamInBlockGrp.getPipeline().getPipelineState())
- .setNodes(nodeStatus).setReplicaIndexes(nodeVsIdx).build();
- // Commit only those blocks to OzoneManager which are not empty
- if (blkGRpLen != 0) {
- OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
- .setBlockID(blkGrpIDVsStreams.getKey()).setLength(blkGRpLen)
- .setOffset(0).setToken(firstStreamInBlockGrp.getToken())
- .setPipeline(blockGrpPipeline).build();
- locationInfoList.add(info);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("block written " + firstStreamInBlockGrp
- .getBlockID() + ", length " + blkGRpLen + " bcsID "
- + firstStreamInBlockGrp.getBlockID().getBlockCommitSequenceId());
- }
- }
- return locationInfoList;
- }
-
- public void endECBlock() throws IOException {
- List<BlockOutputStreamEntry> entries = getStreamEntries();
- if (entries.size() > 0) {
- for (int i = entries.size() - 1; i >= 0; i--) {
- finishedStreamEntries.add(entries.remove(i));
- }
- }
-
- for (BlockOutputStreamEntry entry : finishedStreamEntries) {
- entry.close();
- }
- super.cleanup();
- }
-
- void executePutBlockForAll() throws IOException {
- List<BlockOutputStreamEntry> streamEntries = getStreamEntries();
- int failedStreams = 0;
- for (int i = 0; i < streamEntries.size(); i++) {
- ECBlockOutputStreamEntry ecBlockOutputStreamEntry =
- (ECBlockOutputStreamEntry) streamEntries.get(i);
- if (!ecBlockOutputStreamEntry.isClosed()) {
- if(!ecBlockOutputStreamEntry.isInitialized()){
- // Stream not initialized. Means this stream was not used to write.
- continue;
- }
- ecBlockOutputStreamEntry.executePutBlock();
- }else{
- failedStreams++;
- }
- }
- if(failedStreams > ecReplicationConfig.getParity()) {
- throw new IOException(
- "There are " + failedStreams + " failures than supported tolerance: "
- + ecReplicationConfig.getParity());
- }
- }
-
- void cleanupAll() {
- super.cleanup();
- if (finishedStreamEntries != null) {
- finishedStreamEntries.clear();
- }
- }
-
- public void updateToNextStream(int rotation) {
- super.setCurrIdx((getCurrIdx() + 1) % rotation);
- }
-
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index ec9402c..799e15d 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.List;
import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -78,7 +77,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
LoggerFactory.getLogger(KeyOutputStream.class);
private boolean closed;
- private FileEncryptionInfo feInfo;
// how much of data is actually written yet to underlying stream
private long offset;
// how much data has been ingested into the stream
@@ -126,9 +124,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
replicationConfig, uploadID, partNumber, isMultipart, info,
unsafeByteBufferConversion, xceiverClientManager, handler.getId());
- // Retrieve the file encryption key info, null if file is not in
- // encrypted bucket.
- this.feInfo = info.getFileEncryptionInfo();
this.isException = false;
this.writeOffset = 0;
OzoneConfiguration conf = new OzoneConfiguration();
@@ -181,71 +176,81 @@ public class ECKeyOutputStream extends KeyOutputStream {
if (len == 0) {
return;
}
+ blockOutputStreamEntryPool.allocateBlockIfNeeded();
+ int currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .getCurrentStreamIdx();
int currentChunkBufferRemainingLength =
- ecChunkBufferCache.dataBuffers[blockOutputStreamEntryPool.getCurrIdx()]
- .remaining();
+ ecChunkBufferCache.dataBuffers[currentStreamIdx].remaining();
int currentChunkBufferLen =
- ecChunkBufferCache.dataBuffers[blockOutputStreamEntryPool.getCurrIdx()]
+ ecChunkBufferCache.dataBuffers[currentStreamIdx]
.position();
- int maxLenToCurrChunkBuffer = (int) Math.min(len, ecChunkSize);
+ int maxLenToCurrChunkBuffer = Math.min(len, ecChunkSize);
int currentWriterChunkLenToWrite =
Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
- int pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+ int pos = handleDataWrite(currentStreamIdx, b, off,
currentWriterChunkLenToWrite,
currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize);
- checkAndWriteParityCells(pos);
+ //TODO: do we really need this call?
+ checkAndWriteParityCells(pos, false);
int remLen = len - currentWriterChunkLenToWrite;
int iters = remLen / ecChunkSize;
int lastCellSize = remLen % ecChunkSize;
off += currentWriterChunkLenToWrite;
while (iters > 0) {
- pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
- ecChunkSize, true);
+ currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .getCurrentStreamIdx();
+ pos = handleDataWrite(currentStreamIdx, b, off, ecChunkSize, true);
off += ecChunkSize;
iters--;
- checkAndWriteParityCells(pos);
+ checkAndWriteParityCells(pos, iters > 0 || remLen > 0);
}
if (lastCellSize > 0) {
- pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+ currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .getCurrentStreamIdx();
+ pos = handleDataWrite(currentStreamIdx, b, off,
lastCellSize, false);
- checkAndWriteParityCells(pos);
+ checkAndWriteParityCells(pos, false);
}
writeOffset += len;
}
- private void checkAndWriteParityCells(int lastDataBuffPos)
+ private void checkAndWriteParityCells(int lastDataBuffPos,
+ boolean allocateBlockIfFull)
throws IOException {
//check data blocks finished
//If index is > datanum blks
- if (blockOutputStreamEntryPool
- .getCurrIdx() == numDataBlks && lastDataBuffPos == ecChunkSize) {
+ int currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
+ .getCurrentStreamIdx();
+ if (currentStreamIdx == numDataBlks && lastDataBuffPos == ecChunkSize) {
//Lets encode and write
- handleParityWrites(ecChunkSize);
+ handleParityWrites(ecChunkSize, allocateBlockIfFull);
}
}
- private void handleParityWrites(int parityCellSize) throws IOException {
+ private void handleParityWrites(int parityCellSize,
+ boolean allocateBlockIfFull)
+ throws IOException {
writeParityCells(parityCellSize);
// By this time, we should have finished full stripe. So, lets call
// executePutBlock for all.
// TODO: we should alter the put block calls to share CRC to each stream.
- blockOutputStreamEntryPool.executePutBlockForAll();
+ ECBlockOutputStreamEntry streamEntry =
+ blockOutputStreamEntryPool.getCurrentStreamEntry();
+ streamEntry.executePutBlock();
ecChunkBufferCache.clear(parityCellSize);
- // check if block ends?
- if (shouldEndBlockGroup()) {
- blockOutputStreamEntryPool.endECBlock();
- currentBlockGroupLen = 0;
+ if (streamEntry.getRemaining() <= 0) {
+ streamEntry.close();
+ if (allocateBlockIfFull) {
+ blockOutputStreamEntryPool.allocateBlockIfNeeded();
+ }
+ } else {
+ streamEntry.resetToFirstEntry();
}
- }
-
- private boolean shouldEndBlockGroup() {
- return currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
- .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
- .getLength();
+ currentBlockGroupLen = 0;
}
void writeParityCells(int parityCellSize) throws IOException {
@@ -257,10 +262,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
buffers[i].flip();
}
encoder.encode(buffers, parityBuffers);
+ blockOutputStreamEntryPool
+ .getCurrentStreamEntry().forceToFirstParityBlock();
for (int i =
numDataBlks; i < (this.numDataBlks + this.numParityBlks); i++) {
// Move the stream entry cursor to parity block index
- blockOutputStreamEntryPool.setCurrIdx(i);
handleParityWrite(i, parityBuffers[i - numDataBlks].array(), 0,
ecChunkSize, true);
}
@@ -272,8 +278,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
handleOutputStreamWrite(currIdx, b, off, len, isFullCell, false);
if(pos == ecChunkSize){
- blockOutputStreamEntryPool
- .updateToNextStream(numDataBlks + numParityBlks);
+ blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
}
return pos;
}
@@ -281,22 +286,17 @@ public class ECKeyOutputStream extends KeyOutputStream {
private void handleParityWrite(int currIdx, byte[] b, int off, long len,
boolean isFullCell) throws IOException {
handleOutputStreamWrite(currIdx, b, off, len, isFullCell, true);
- blockOutputStreamEntryPool
- .updateToNextStream(numDataBlks + numParityBlks);
+ blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
}
private void handleOutputStreamWrite(int currIdx, byte[] b, int off, long
len,
boolean isFullCell, boolean isParity) throws IOException {
BlockOutputStreamEntry current =
- blockOutputStreamEntryPool.allocateBlockIfNeeded();
+ blockOutputStreamEntryPool.getCurrentStreamEntry();
int writeLengthToCurrStream =
Math.min((int) len, (int) current.getRemaining());
currentBlockGroupLen += isParity ? 0 : writeLengthToCurrStream;
- if (current.getRemaining() <= 0) {
- // since the current block is already written close the stream.
- closeCurrentStream(StreamAction.CLOSE);
- }
len -= writeLengthToCurrStream;
if (isFullCell) {
@@ -421,32 +421,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
}
- private void closeCurrentStream(StreamAction op) throws IOException {
- if (!blockOutputStreamEntryPool.isEmpty()) {
- List<BlockOutputStreamEntry> allStreamEntries =
- blockOutputStreamEntryPool.getStreamEntries();
- for (int i = 0; i < allStreamEntries.size(); i++) {
- while (true) {
- try {
- BlockOutputStreamEntry entry = allStreamEntries.get(i);
- if (entry != null) {
- try {
- handleStreamAction(entry, op);
- } catch (IOException ioe) {
- handleException(entry, ioe);
- continue;
- }
- }
- return;
- } catch (Exception e) {
- markStreamClosed();
- throw e;
- }
- }
- }
- }
- }
-
private void handleStreamAction(BlockOutputStreamEntry entry, StreamAction
op)
throws IOException {
Collection<DatanodeDetails> failedServers = entry.getFailedServers();
@@ -484,11 +458,10 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
closed = true;
try {
- handleFlushOrCloseAllStreams(StreamAction.CLOSE);
if(isPartialStripe()){
ByteBuffer bytesToWrite =
ecChunkBufferCache.getDataBuffers()[blockOutputStreamEntryPool
- .getCurrIdx()];
+ .getCurrentStreamEntry().getCurrentStreamIdx()];
// Finish writing the current partial cached chunk
if (bytesToWrite.position() % ecChunkSize != 0) {
@@ -509,16 +482,17 @@ public class ECKeyOutputStream extends KeyOutputStream {
final int parityCellSize =
lastStripeSize < ecChunkSize ? lastStripeSize : ecChunkSize;
addPadding(parityCellSize);
- handleParityWrites(parityCellSize);
+ handleParityWrites(parityCellSize, false);
}
+ handleFlushOrCloseAllStreams(StreamAction.CLOSE);
if (!isException) {
Preconditions.checkArgument(writeOffset == offset);
}
- blockOutputStreamEntryPool.endECBlock();
+ blockOutputStreamEntryPool.getCurrentStreamEntry().close();
blockOutputStreamEntryPool.commitKey(offset);
} finally {
- blockOutputStreamEntryPool.cleanupAll();
+ blockOutputStreamEntryPool.cleanup();
}
ecChunkBufferCache.release();
}
@@ -552,10 +526,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
return blockOutputStreamEntryPool.getCommitUploadPartInfo();
}
- public FileEncryptionInfo getFileEncryptionInfo() {
- return feInfo;
- }
-
@VisibleForTesting
public ExcludeList getExcludeList() {
return blockOutputStreamEntryPool.getExcludeList();
@@ -685,7 +655,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
private int addToDataBuffer(int i, byte[] b, int off, int len) {
final ByteBuffer buf = dataBuffers[i];
final int pos = buf.position() + len;
- Preconditions.checkState(pos <= cellSize);
+ Preconditions.checkState(pos <= cellSize,
+ "Position("+pos+") is greater than the cellSize("+cellSize+").");
buf.put(b, off, len);
return pos;
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index d5f6f5d..4e0fbf9 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -150,7 +150,6 @@ public class KeyOutputStream extends OutputStream {
unsafeByteBufferConversion,
xceiverClientManager,
handler.getId());
-
this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
config.getMaxRetryCount(), config.getRetryInterval());
this.retryCount = 0;
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
index f5d2de3..2c315b7 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
@@ -61,6 +61,9 @@ public class MultiNodePipelineBlockAllocator implements
MockBlockAllocator {
.setIpAddress("1.2.3.4").addPorts(
HddsProtos.Port.newBuilder().setName("RATIS").setValue(1234 +
i)
.build()).build());
+ if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
+ builder.addMemberReplicaIndexes(i);
+ }
}
if (keyArgs.getType() == HddsProtos.ReplicationType.EC) {
builder.setEcReplicationConfig(keyArgs.getEcReplicationConfig());
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 1ba25f3..c66d0f6 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -66,6 +66,7 @@ public class TestOzoneECClient {
private int chunkSize = 1024;
private int dataBlocks = 3;
private int parityBlocks = 2;
+ private int inputSize = chunkSize * dataBlocks;
private OzoneClient client;
private ObjectStore store;
private String keyName = UUID.randomUUID().toString();
@@ -151,13 +152,13 @@ public class TestOzoneECClient {
@Test
public void testPutECKeyAndCheckParityData() throws IOException {
OzoneBucket bucket = writeIntoECKey(inputChunks, keyName, null);
- final ByteBuffer[] dataBuffers = new ByteBuffer[3];
+ final ByteBuffer[] dataBuffers = new ByteBuffer[dataBlocks];
for (int i = 0; i < inputChunks.length; i++) {
dataBuffers[i] = ByteBuffer.wrap(inputChunks[i]);
}
final ByteBuffer[] parityBuffers = new ByteBuffer[parityBlocks];
for (int i = 0; i < parityBlocks; i++) {
- parityBuffers[i] = ByteBuffer.allocate(1024);
+ parityBuffers[i] = ByteBuffer.allocate(chunkSize);
}
encoder.encode(dataBuffers, parityBuffers);
OzoneKey key = bucket.getKey(keyName);
@@ -186,7 +187,7 @@ public class TestOzoneECClient {
OzoneKey key = bucket.getKey(keyName);
Assert.assertEquals(keyName, key.getName());
try (OzoneInputStream is = bucket.readKey(keyName)) {
- byte[] fileContent = new byte[1024];
+ byte[] fileContent = new byte[chunkSize];
for (int i=0; i<dataBlocks; i++) {
Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
Assert.assertTrue(Arrays.equals(inputChunks[i], fileContent));
@@ -206,7 +207,7 @@ public class TestOzoneECClient {
// create key without mentioning replication config. Since we set EC
// replication in bucket, key should be EC key.
- try (OzoneOutputStream out = bucket.createKey("mykey", 2000)) {
+ try (OzoneOutputStream out = bucket.createKey("mykey", inputSize)) {
Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
for (int i = 0; i < inputChunks.length; i++) {
out.write(inputChunks[i]);
@@ -278,7 +279,7 @@ public class TestOzoneECClient {
}
@Test
- public void testMultipleChunksWithPartialChunkInSigleWripeOp()
+ public void testMultipleChunksWithPartialChunkInSingleWriteOp()
throws IOException {
final int partialChunkLen = 10;
final int numFullChunks = 9;
@@ -310,7 +311,7 @@ public class TestOzoneECClient {
// create key without mentioning replication config. Since we set EC
// replication in bucket, key should be EC key.
- try (OzoneOutputStream out = bucket.createKey("mykey", 1024)) {
+ try (OzoneOutputStream out = bucket.createKey("mykey", 6*inputSize)) {
Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
// Block Size is 2kb, so to create 3 blocks we need 6 iterations here
for (int j = 0; j < 6; j++) {
@@ -348,7 +349,7 @@ public class TestOzoneECClient {
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
- try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+ try (OzoneOutputStream out = bucket.createKey(keyName, inputSize,
new ECReplicationConfig(dataBlocks, parityBlocks,
ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
for (int i = 0; i < inputChunks[0].length; i++) {
@@ -367,7 +368,7 @@ public class TestOzoneECClient {
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
- try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+ try (OzoneOutputStream out = bucket.createKey(keyName, inputSize,
new ECReplicationConfig(dataBlocks, parityBlocks,
ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
for (int i = 0; i < inputChunks[0].length-1; i++) {
@@ -392,7 +393,8 @@ public class TestOzoneECClient {
Arrays.copyOf(inputChunks[inputChunks.length - 1],
inputChunks[inputChunks.length - 1].length - 1);
- try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+ int inSize = chunkSize * (inputChunks.length - 1) + lastChunk.length;
+ try (OzoneOutputStream out = bucket.createKey(keyName, inSize,
new ECReplicationConfig(dataBlocks, parityBlocks,
ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
for (int i = 0; i < inputChunks.length - 1; i++) {
@@ -405,7 +407,7 @@ public class TestOzoneECClient {
}
try (OzoneInputStream is = bucket.readKey(keyName)) {
- byte[] fileContent = new byte[1024];
+ byte[] fileContent = new byte[chunkSize];
for (int i=0; i<2; i++) {
Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
Assert.assertTrue(Arrays.equals(inputChunks[i], fileContent));
@@ -436,9 +438,10 @@ public class TestOzoneECClient {
}
OzoneBucket bucket = volume.getBucket(bucketName);
- try (OzoneOutputStream out = bucket.createKey(key, 4096,
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize), new HashMap<>())) {
+ int size = (int) Arrays.stream(chunks).mapToLong(a -> a.length).sum();
+ try (OzoneOutputStream out = bucket.createKey(key, size,
+ new ECReplicationConfig(dataBlocks, parityBlocks,
+ ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
for (int i = 0; i < chunks.length; i++) {
out.write(chunks[i]);
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
new file mode 100644
index 0000000..4b87ef7
--- /dev/null
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * {@link ECBlockOutputStreamEntry} tests.
+ */
+public class TestECBlockOutputStreamEntry {
+
+ @Test
+ public void
+ testAcquireDifferentClientForECBlocksOnTheSameHostButDifferentPort()
+ throws IOException {
+ PipelineID randomId = PipelineID.randomId();
+ ReplicationConfig ecReplicationConfig =
+ new ECReplicationConfig("RS-3-2-1024k");
+ DatanodeDetails node1 = aNode("127.0.0.1", "localhost", 2001);
+ DatanodeDetails node2 = aNode("127.0.0.1", "localhost", 2002);
+ DatanodeDetails node3 = aNode("127.0.0.1", "localhost", 2003);
+ DatanodeDetails node4 = aNode("127.0.0.1", "localhost", 2004);
+ DatanodeDetails node5 = aNode("127.0.0.1", "localhost", 2005);
+ List<DatanodeDetails> nodes =
+ Arrays.asList(node1, node2, node3, node4, node5);
+ Pipeline anECPipeline = Pipeline.newBuilder()
+ .setId(randomId)
+ .setReplicationConfig(ecReplicationConfig)
+ .setState(Pipeline.PipelineState.OPEN)
+ .setNodes(nodes)
+ .build();
+ XceiverClientManager manager =
+ new XceiverClientManager(new OzoneConfiguration());
+ HashSet<XceiverClientSpi> clients = new HashSet<>();
+ ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder()
+ .setXceiverClientManager(manager)
+ .setPipeline(anECPipeline)
+ .build();
+ for (int i = 0; i < nodes.size(); i++) {
+ clients.add(
+ manager.acquireClient(
+ entry.createSingleECBlockPipeline(
+ anECPipeline, nodes.get(i), i
+ )));
+ }
+ assertEquals(5, clients.size());
+ }
+
+ @Test
+ public void
+
testAcquireDifferentClientForECBlocksOnTheSameHostWithSomeOnSamePortAlso()
+ throws IOException {
+ PipelineID randomId = PipelineID.randomId();
+ ReplicationConfig ecReplicationConfig =
+ new ECReplicationConfig("RS-3-2-1024k");
+ DatanodeDetails node1 = aNode("127.0.0.1", "localhost", 2001);
+ DatanodeDetails node2 = aNode("127.0.0.1", "localhost", 2001);
+ DatanodeDetails node3 = aNode("127.0.0.1", "localhost", 2003);
+ DatanodeDetails node4 = aNode("127.0.0.1", "localhost", 2001);
+ DatanodeDetails node5 = aNode("127.0.0.1", "localhost", 2005);
+ List<DatanodeDetails> nodes =
+ Arrays.asList(node1, node2, node3, node4, node5);
+ Pipeline anECPipeline = Pipeline.newBuilder()
+ .setId(randomId)
+ .setReplicationConfig(ecReplicationConfig)
+ .setState(Pipeline.PipelineState.OPEN)
+ .setNodes(nodes)
+ .build();
+ XceiverClientManager manager =
+ new XceiverClientManager(new OzoneConfiguration());
+ HashSet<XceiverClientSpi> clients = new HashSet<>();
+ ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder()
+ .setXceiverClientManager(manager)
+ .setPipeline(anECPipeline)
+ .build();
+ for (int i = 0; i < nodes.size(); i++) {
+ clients.add(
+ manager.acquireClient(
+ entry.createSingleECBlockPipeline(
+ anECPipeline, nodes.get(i), i
+ )));
+ }
+ assertEquals(3, clients.size());
+ assertEquals(1, clients.stream().filter(c -> c.getRefcount() ==
3).count());
+ assertEquals(2, clients.stream().filter(c -> c.getRefcount() ==
1).count());
+ }
+
+ private DatanodeDetails aNode(String ip, String hostName, int port) {
+ return DatanodeDetails.newBuilder()
+ .setUuid(UUID.randomUUID())
+ .setIpAddress(ip)
+ .setHostName(hostName)
+ .addPort(
+ DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE,
port))
+ .build();
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 84ccd44..6f12b62 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -16,7 +16,6 @@
*/
package org.apache.hadoop.ozone.client.rpc;
-import com.google.common.cache.Cache;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -25,9 +24,6 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.BucketArgs;
@@ -37,7 +33,6 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
@@ -49,12 +44,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -79,6 +70,7 @@ public class TestECKeyOutputStream {
private static String keyString;
private static int dataBlocks = 3;
private static int parityBlocks = 2;
+ private static int inputSize = dataBlocks * chunkSize;
private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
/**
@@ -132,8 +124,8 @@ public class TestECKeyOutputStream {
public void testCreateKeyWithECReplicationConfig() throws Exception {
try (OzoneOutputStream key = TestHelper
.createKey(keyString, new ECReplicationConfig(3, 2,
- ECReplicationConfig.EcCodec.RS, chunkSize), 2000, objectStore,
- volumeName, bucketName)) {
+ ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
+ objectStore, volumeName, bucketName)) {
Assert.assertTrue(key.getOutputStream() instanceof ECKeyOutputStream);
}
}
@@ -142,7 +134,7 @@ public class TestECKeyOutputStream {
public void testCreateKeyWithOutBucketDefaults() throws Exception {
OzoneVolume volume = objectStore.getVolume(volumeName);
OzoneBucket bucket = volume.getBucket(bucketName);
- try (OzoneOutputStream out = bucket.createKey("myKey", 2000)) {
+ try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
for (int i = 0; i < inputChunks.length; i++) {
out.write(inputChunks[i]);
@@ -152,8 +144,18 @@ public class TestECKeyOutputStream {
@Test
public void testCreateKeyWithBucketDefaults() throws Exception {
- OzoneBucket bucket = getOzoneBucket();
- try (OzoneOutputStream out = bucket.createKey(keyString, 2000)) {
+ String myBucket = UUID.randomUUID().toString();
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+ bucketArgs.setDefaultReplicationConfig(
+ new DefaultReplicationConfig(ReplicationType.EC,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize)));
+
+ volume.createBucket(myBucket, bucketArgs.build());
+ OzoneBucket bucket = volume.getBucket(myBucket);
+
+ try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
for (int i = 0; i < inputChunks.length; i++) {
out.write(inputChunks[i]);
@@ -244,64 +246,6 @@ public class TestECKeyOutputStream {
return volume.getBucket(myBucket);
}
-
-
- @Test
- public void testECKeyXceiverClientShouldNotUseCachedKeysForDifferentStreams()
- throws Exception {
- int data = 3;
- int parity = 2;
- try (OzoneOutputStream key = TestHelper
- .createKey(keyString, new ECReplicationConfig(data, parity,
- ECReplicationConfig.EcCodec.RS, chunkSize), 1024,
- objectStore, volumeName, bucketName)) {
- final List<BlockOutputStreamEntry> streamEntries =
- ((ECKeyOutputStream) key.getOutputStream()).getStreamEntries();
- Assert.assertEquals(data + parity, streamEntries.size());
- final Cache<String, XceiverClientSpi> clientCache =
- ((XceiverClientManager) ((ECKeyOutputStream) key.getOutputStream())
- .getXceiverClientFactory()).getClientCache();
- clientCache.invalidateAll();
- clientCache.cleanUp();
- final Pipeline firstStreamPipeline = streamEntries.get(0).getPipeline();
- XceiverClientSpi xceiverClientSpi =
- ((ECKeyOutputStream) key.getOutputStream()).getXceiverClientFactory()
- .acquireClient(firstStreamPipeline);
- Assert.assertNotNull(xceiverClientSpi);
- final String firstCacheKey =
- clientCache.asMap().entrySet().iterator().next().getKey();
- List<String> prevVisitedKeys = new ArrayList<>();
- prevVisitedKeys.add(firstCacheKey);
- // Lets look at all underlying EC Block group streams and make sure
- // xceiver client entry is not repeating for all.
- for (int i = 1; i < streamEntries.size(); i++) {
- Pipeline pipeline = streamEntries.get(i).getPipeline();
- Assert.assertEquals(i, clientCache.asMap().size());
- xceiverClientSpi = ((ECKeyOutputStream) key.getOutputStream())
- .getXceiverClientFactory().acquireClient(pipeline);
- Assert.assertNotNull(xceiverClientSpi);
- Assert.assertEquals(i + 1, clientCache.asMap().size());
- final String newCacheKey =
- getNewKey(clientCache.asMap().entrySet().iterator(),
- prevVisitedKeys);
- prevVisitedKeys.add(newCacheKey);
- Assert.assertNotEquals(firstCacheKey, newCacheKey);
- }
- }
- }
-
- private String getNewKey(
- Iterator<Map.Entry<String, XceiverClientSpi>> iterator,
- List<String> prevVisitedKeys) {
- while (iterator.hasNext()) {
- final String key = iterator.next().getKey();
- if (!prevVisitedKeys.contains(key)) {
- return key;
- }
- }
- return null;
- }
-
private static void initInputChunks() {
for (int i = 0; i < dataBlocks; i++) {
inputChunks[i] = getBytesWith(i + 1, chunkSize);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]