This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 90c4d92 HDDS-5470 : EC: Add padding and generate parity if the last
stripe is not full (#2455)
90c4d92 is described below
commit 90c4d9286d3279bd540a329acd4de64eed948778
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Fri Aug 27 10:33:52 2021 -0700
HDDS-5470 : EC: Add padding and generate parity if the last stripe is not
full (#2455)
---
.../ozone/client/io/BlockOutputStreamEntry.java | 4 +
.../client/io/BlockOutputStreamEntryPool.java | 4 +-
.../ozone/client/io/ECBlockOutputStreamEntry.java | 17 +-
.../client/io/ECBlockOutputStreamEntryPool.java | 39 +++--
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 171 +++++++++++++++------
.../hadoop/ozone/client/TestOzoneClient.java | 20 ++-
.../hadoop/ozone/client/TestOzoneECClient.java | 86 ++++++++++-
7 files changed, 271 insertions(+), 70 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 2d151ed..ce12136 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -191,6 +191,10 @@ public class BlockOutputStreamEntry extends OutputStream {
}
+ boolean isInitialized() {
+ return outputStream != null;
+ }
+
/**
* Builder class for ChunkGroupOutputStreamEntry.
* */
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 0af651c..5650006 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -299,8 +299,8 @@ public class BlockOutputStreamEntryPool {
this.currentStreamIndex = currIdx;
}
- public void updateToNextStream(int rotation){
- currentStreamIndex = (currentStreamIndex+1) % rotation;
+ public void updateToNextStream(int rotation) {
+ currentStreamIndex = (currentStreamIndex + 1) % rotation;
}
BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 383ed17..3016060 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -32,14 +32,16 @@ import java.io.IOException;
* Helper for {@link ECBlockOutputStream}.
*/
public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
+ private final boolean isParityStreamEntry;
private ECBlockOutputStream out;
@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long
length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config) {
+ OzoneClientConfig config, boolean isParityStream) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
token, config);
+ this.isParityStreamEntry = isParityStream;
}
@Override
@@ -53,6 +55,10 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
this.out.executePutBlock(false, true);
}
+ public boolean isParityStreamEntry() {
+ return this.isParityStreamEntry;
+ }
+
/**
* Builder class for ChunkGroupOutputStreamEntry.
* */
@@ -66,6 +72,7 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
private OzoneClientConfig config;
+ private boolean isParityStreamEntry;
public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
this.blockID = bID;
@@ -113,6 +120,12 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
return this;
}
+ public ECBlockOutputStreamEntry.Builder setIsParityStreamEntry(
+ boolean isParity) {
+ this.isParityStreamEntry = isParity;
+ return this;
+ }
+
public ECBlockOutputStreamEntry build() {
return new ECBlockOutputStreamEntry(blockID,
key,
@@ -120,7 +133,7 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
pipeline,
length,
bufferPool,
- token, config);
+ token, config, isParityStreamEntry);
}
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index ca76b75..3fd6bc6 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -84,7 +84,8 @@ public class ECBlockOutputStreamEntryPool extends
BlockOutputStreamEntryPool {
.setXceiverClientManager(getXceiverClientFactory())
.setPipeline(pipeline).setConfig(getConfig())
.setLength(subKeyInfo.getLength()).setBufferPool(getBufferPool())
- .setToken(subKeyInfo.getToken());
+ .setToken(subKeyInfo.getToken())
+ .setIsParityStreamEntry(i >= ecReplicationConfig.getData());
getStreamEntries().add(builder.build());
}
}
@@ -100,17 +101,15 @@ public class ECBlockOutputStreamEntryPool extends
BlockOutputStreamEntryPool {
return locationInfoList;
}
+ @Override
long getKeyLength() {
- long totalLength = getStreamEntries().stream().filter(c -> {
- return (c.getPipeline().getReplicaIndex(
- c.getPipeline().getNodes().iterator()
- .next())) <= ecReplicationConfig.getData();
- }).mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
- totalLength += finishedStreamEntries.stream().filter(c -> {
- return (c.getPipeline().getReplicaIndex(
- c.getPipeline().getNodes().iterator()
- .next())) <= ecReplicationConfig.getData();
- }).mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
+ long totalLength = getStreamEntries().stream()
+ .filter(c -> !((ECBlockOutputStreamEntry) c).isParityStreamEntry())
+ .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
+
+ totalLength += finishedStreamEntries.stream()
+ .filter(c -> !((ECBlockOutputStreamEntry) c).isParityStreamEntry())
+ .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
return totalLength;
}
@@ -130,8 +129,24 @@ public class ECBlockOutputStreamEntryPool extends
BlockOutputStreamEntryPool {
void executePutBlockForAll() throws IOException {
List<BlockOutputStreamEntry> streamEntries = getStreamEntries();
+ int failedStreams = 0;
for (int i = 0; i < streamEntries.size(); i++) {
- ((ECBlockOutputStreamEntry) streamEntries.get(i)).executePutBlock();
+ ECBlockOutputStreamEntry ecBlockOutputStreamEntry =
+ (ECBlockOutputStreamEntry) streamEntries.get(i);
+ if (!ecBlockOutputStreamEntry.isClosed()) {
+ if(!ecBlockOutputStreamEntry.isInitialized()){
+ // Stream not initialized. Means this stream was not used to write.
+ continue;
+ }
+ ecBlockOutputStreamEntry.executePutBlock();
+ }else{
+ failedStreams++;
+ }
+ }
+ if(failedStreams > ecReplicationConfig.getParity()) {
+ throw new IOException(
+ "There are " + failedStreams + " failures than supported tolerance: "
+ + ecReplicationConfig.getParity());
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 474dcab..2f1f815 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.io;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -192,74 +193,102 @@ public class ECKeyOutputStream extends KeyOutputStream {
int maxLenToCurrChunkBuffer = (int) Math.min(len, ecChunkSize);
int currentWriterChunkLenToWrite =
Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
- handleWrite(b, off, currentWriterChunkLenToWrite,
- currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize,
- false);
- checkAndWriteParityCells();
+ int pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+ currentWriterChunkLenToWrite,
+ currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize);
+ checkAndWriteParityCells(pos);
int remLen = len - currentWriterChunkLenToWrite;
int iters = remLen / ecChunkSize;
int lastCellSize = remLen % ecChunkSize;
while (iters > 0) {
- handleWrite(b, off, ecChunkSize, true, false);
+ pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+ ecChunkSize, true);
off += ecChunkSize;
iters--;
- checkAndWriteParityCells();
+ checkAndWriteParityCells(pos);
}
if (lastCellSize > 0) {
- handleWrite(b, off, lastCellSize, false, false);
- checkAndWriteParityCells();
+ pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+ lastCellSize, false);
+ checkAndWriteParityCells(pos);
}
writeOffset += len;
}
- private void checkAndWriteParityCells() throws IOException {
+ private void checkAndWriteParityCells(int lastDataBuffPos)
+ throws IOException {
//check data blocks finished
//If index is > datanum blks
- if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+ if (blockOutputStreamEntryPool
+ .getCurrIdx() == numDataBlks && lastDataBuffPos == ecChunkSize) {
//Lets encode and write
- //encoder.encode();
- writeParityCells();
- // By this time, we should have finished full stripe. So, lets call
- // executePutBlock for all.
- // TODO: we should alter the put block calls to share CRC to each stream.
- blockOutputStreamEntryPool.executePutBlockForAll();
- ecChunkBufferCache.clear();
-
- // check if block ends?
- if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
- .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
- .getLength()) {
- blockOutputStreamEntryPool.endECBlock();
- currentBlockGroupLen = 0;
- }
+ handleParityWrites(ecChunkSize);
}
}
- void writeParityCells() throws IOException {
- final ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
- //encode the data cells
- for (int i = 0; i < numDataBlks; i++) {
- buffers[i].flip();
+ private void handleParityWrites(int parityCellSize) throws IOException {
+ writeParityCells(parityCellSize);
+ // By this time, we should have finished full stripe. So, lets call
+ // executePutBlock for all.
+ // TODO: we should alter the put block calls to share CRC to each stream.
+ blockOutputStreamEntryPool.executePutBlockForAll();
+ ecChunkBufferCache.clear();
+
+ // check if block ends?
+ if (shouldEndBlockGroup()) {
+ blockOutputStreamEntryPool.endECBlock();
+ currentBlockGroupLen = 0;
}
+ }
+
+ private boolean shouldEndBlockGroup() {
+ return currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+ .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+ .getLength();
+ }
+ void writeParityCells(int parityCellSize) throws IOException {
+ final ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
+ ecChunkBufferCache.allocateParityBuffers(parityCellSize);
final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
+
+ for(int i=0; i< buffers.length; i++){
+ buffers[i].flip();
+ }
encoder.encode(buffers, parityBuffers);
for (int i =
numDataBlks; i < (this.numDataBlks + this.numParityBlks); i++) {
- handleWrite(parityBuffers[i - numDataBlks].array(), 0, ecChunkSize, true,
- true);
+ // Move the stream entry cursor to parity block index
+ blockOutputStreamEntryPool.setCurrIdx(i);
+ handleParityWrite(i, parityBuffers[i - numDataBlks].array(), 0,
+ ecChunkSize, true);
}
}
- private void handleWrite(byte[] b, int off, long len, boolean isFullCell,
- boolean isParity) throws IOException {
- if (!isParity) {
- ecChunkBufferCache
- .addToDataBuffer(blockOutputStreamEntryPool.getCurrIdx(), b, off,
- (int) len);
+ private int handleDataWrite(int currIdx, byte[] b, int off, long len,
+ boolean isFullCell) throws IOException {
+ int pos = ecChunkBufferCache.addToDataBuffer(currIdx, b, off, (int) len);
+ handleOutputStreamWrite(currIdx, b, off, len, isFullCell, false);
+
+ if(pos == ecChunkSize){
+ blockOutputStreamEntryPool
+ .updateToNextStream(numDataBlks + numParityBlks);
}
+ return pos;
+ }
+
+ private void handleParityWrite(int currIdx, byte[] b, int off, long len,
+ boolean isFullCell) throws IOException {
+ handleOutputStreamWrite(currIdx, b, off, len, isFullCell, true);
+ blockOutputStreamEntryPool
+ .updateToNextStream(numDataBlks + numParityBlks);
+ }
+
+ private void handleOutputStreamWrite(int currIdx, byte[] b, int off, long
len,
+ boolean isFullCell, boolean isParity) throws IOException {
+
BlockOutputStreamEntry current =
blockOutputStreamEntryPool.allocateBlockIfNeeded();
int writeLengthToCurrStream =
@@ -273,10 +302,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
len -= writeLengthToCurrStream;
if (isFullCell) {
ByteBuffer bytesToWrite = isParity ?
- ecChunkBufferCache.getParityBuffers()[blockOutputStreamEntryPool
- .getCurrIdx() - numDataBlks] :
- ecChunkBufferCache.getDataBuffers()[blockOutputStreamEntryPool
- .getCurrIdx()];
+ ecChunkBufferCache.getParityBuffers()[currIdx - numDataBlks] :
+ ecChunkBufferCache.getDataBuffers()[currIdx];
try {
writeToOutputStream(current, len, bytesToWrite.array(),
bytesToWrite.array().length, 0, current.getWrittenDataLength(),
@@ -284,9 +311,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
} catch (Exception e) {
markStreamClosed();
}
-
- blockOutputStreamEntryPool
- .updateToNextStream(numDataBlks + numParityBlks);
}
}
@@ -462,6 +486,33 @@ public class ECKeyOutputStream extends KeyOutputStream {
closed = true;
try {
handleFlushOrCloseAllStreams(StreamAction.CLOSE);
+ if(isPartialStripe()){
+ ByteBuffer bytesToWrite =
+ ecChunkBufferCache.getDataBuffers()[blockOutputStreamEntryPool
+ .getCurrIdx()];
+
+ // Finish writing the current partial cached chunk
+ if (bytesToWrite.position() % ecChunkSize != 0) {
+ final BlockOutputStreamEntry current =
+ blockOutputStreamEntryPool.getCurrentStreamEntry();
+ try {
+ byte[] array = bytesToWrite.array();
+ writeToOutputStream(current, bytesToWrite.position(), array,
+ bytesToWrite.position(), 0, current.getWrittenDataLength(),
+ false);
+ } catch (Exception e) {
+ markStreamClosed();
+ }
+ }
+ final int lastStripeSize =
+ (int) (currentBlockGroupLen % (numDataBlks * ecChunkSize));
+
+ final int parityCellSize =
+ lastStripeSize < ecChunkSize ? lastStripeSize : ecChunkSize;
+ addPadding(parityCellSize);
+ handleParityWrites(parityCellSize);
+ }
+
if (!isException) {
Preconditions.checkArgument(writeOffset == offset);
}
@@ -473,6 +524,31 @@ public class ECKeyOutputStream extends KeyOutputStream {
ecChunkBufferCache.release();
}
+ private void addPadding(int parityCellSize) {
+ ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
+
+ for (int i = 1; i < numDataBlks; i++) {
+ final int position = buffers[i].position();
+ assert position <= parityCellSize : "If an internal block is smaller"
+ + " than parity block, then its last cell should be small than last"
+ + " parity cell";
+ padBufferToLimit(buffers[i], parityCellSize);
+ }
+ }
+
+ public static void padBufferToLimit(ByteBuffer buf, int limit) {
+ int pos = buf.position();
+ if (pos >= limit) {
+ return;
+ }
+ Arrays.fill(buf.array(), pos, limit, (byte)0);
+ buf.position(limit);
+ }
+
+ private boolean isPartialStripe() {
+ return currentBlockGroupLen % (numDataBlks * ecChunkSize) > 0;
+ }
+
public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
return blockOutputStreamEntryPool.getCommitUploadPartInfo();
}
@@ -593,7 +669,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
dataBuffers = new ByteBuffer[this.dataBlks];
parityBuffers = new ByteBuffer[this.parityBlks];
allocateBuffers(cellSize, dataBuffers);
- allocateBuffers(cellSize, parityBuffers);
}
private ByteBuffer[] getDataBuffers() {
@@ -604,6 +679,10 @@ public class ECKeyOutputStream extends KeyOutputStream {
return parityBuffers;
}
+ public void allocateParityBuffers(int size){
+ allocateBuffers(size, parityBuffers);
+ }
+
private int addToDataBuffer(int i, byte[] b, int off, int len) {
final ByteBuffer buf = dataBuffers[i];
final int pos = buf.position() + len;
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
index e43401f..804831a 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
@@ -72,20 +72,23 @@ public class TestOzoneClient {
@Before
public void init() throws IOException {
ConfigurationSource config = new InMemoryConfiguration();
+ createNewClient(config, new SinglePipelineBlockAllocator());
+ }
+
+ private void createNewClient(ConfigurationSource config,
+ MockBlockAllocator blkAllocator) throws IOException {
client = new OzoneClient(config, new RpcClient(config, null) {
@Override
- protected OmTransport createOmTransport(
- String omServiceId)
+ protected OmTransport createOmTransport(String omServiceId)
throws IOException {
- return new MockOmTransport();
+ return new MockOmTransport(blkAllocator);
}
@NotNull
@Override
protected XceiverClientFactory createXceiverClientFactory(
- List<X509Certificate> x509Certificates)
- throws IOException {
+ List<X509Certificate> x509Certificates) throws IOException {
return new MockXceiverClientFactory();
}
});
@@ -186,6 +189,11 @@ public class TestOzoneClient {
@Test
public void testPutKeyWithECReplicationConfig() throws IOException {
+ close();
+ ConfigurationSource config = new InMemoryConfiguration();
+ int data = 3;
+ int parity = 2;
+ createNewClient(config, new MultiNodePipelineBlockAllocator(data +
parity));
String value = new String(new byte[1024], UTF_8);
OzoneBucket bucket = getOzoneBucket();
@@ -193,7 +201,7 @@ public class TestOzoneClient {
String keyName = UUID.randomUUID().toString();
try (OzoneOutputStream out = bucket
.createKey(keyName, value.getBytes(UTF_8).length,
- new ECReplicationConfig(3, 2), new HashMap<>())) {
+ new ECReplicationConfig(data, parity), new HashMap<>())) {
out.write(value.getBytes(UTF_8));
out.write(value.getBytes(UTF_8));
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 70d4406..9591e27 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -69,6 +69,8 @@ public class TestOzoneECClient {
private OzoneClient client;
private ObjectStore store;
private String keyName = UUID.randomUUID().toString();
+ private String volumeName = UUID.randomUUID().toString();
+ private String bucketName = UUID.randomUUID().toString();
private byte[][] inputChunks = new byte[dataBlocks][chunkSize];
private final XceiverClientFactory factoryStub =
new MockXceiverClientFactory();
@@ -225,11 +227,91 @@ public class TestOzoneECClient {
}
}
+ @Test
+ public void testPartialStripeWithSingleChunkAndPadding() throws IOException {
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+ new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
+ for (int i = 0; i < inputChunks[0].length; i++) {
+ out.write(inputChunks[0][i]);
+ }
+ }
+
+ OzoneKey key = bucket.getKey(keyName);
+ Assert.assertEquals(keyName, key.getName());
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[1024];
+ Assert.assertEquals(inputChunks[0].length, is.read(fileContent));
+ Assert.assertEquals(new String(inputChunks[0], UTF_8),
+ new String(fileContent, UTF_8));
+ }
+ }
+
+ @Test
+ public void testPartialStripeLessThanSingleChunkWithPadding()
+ throws IOException {
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+ new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
+ for (int i = 0; i < inputChunks[0].length-1; i++) {
+ out.write(inputChunks[0][i]);
+ }
+ }
+
+ OzoneKey key = bucket.getKey(keyName);
+ Assert.assertEquals(keyName, key.getName());
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[1023];
+ Assert.assertEquals(inputChunks[0].length - 1, is.read(fileContent));
+ Assert.assertEquals(
+ new String(Arrays.copyOf(inputChunks[0], inputChunks[0].length - 1),
+ UTF_8), new String(fileContent, UTF_8));
+ }
+ }
+
+ @Test
+ public void testPartialStripeWithPartialLastChunk()
+ throws IOException {
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ byte[] lastChunk = inputChunks[inputChunks.length - 1];
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+ new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
+ for (int i = 0; i < inputChunks.length - 1; i++) {
+ out.write(inputChunks[i]);
+ }
+
+ for (int i = 0; i < lastChunk.length - 1; i++) {
+ out.write(lastChunk[i]);
+ }
+ }
+
+ // Making sure to keep only the 3rd node in pipeline, so that 3rd chunk can
+ // be read.
+ updatePipelineToKeepSingleNode(3);
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[1023];
+ Assert.assertEquals(lastChunk.length - 1, is.read(fileContent));
+ Assert.assertEquals(
+ new String(Arrays.copyOf(lastChunk, lastChunk.length - 1), UTF_8),
+ new String(fileContent, UTF_8));
+ }
+ }
private OzoneBucket writeIntoECKey(byte[][] chunks, String key,
DefaultReplicationConfig defaultReplicationConfig) throws IOException {
- String volumeName = UUID.randomUUID().toString();
- String bucketName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
if (defaultReplicationConfig != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]