This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 90c4d92  HDDS-5470 : EC: Add padding and generate parity if the last 
stripe is not full (#2455)
90c4d92 is described below

commit 90c4d9286d3279bd540a329acd4de64eed948778
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Fri Aug 27 10:33:52 2021 -0700

    HDDS-5470 : EC: Add padding and generate parity if the last stripe is not 
full (#2455)
---
 .../ozone/client/io/BlockOutputStreamEntry.java    |   4 +
 .../client/io/BlockOutputStreamEntryPool.java      |   4 +-
 .../ozone/client/io/ECBlockOutputStreamEntry.java  |  17 +-
 .../client/io/ECBlockOutputStreamEntryPool.java    |  39 +++--
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 171 +++++++++++++++------
 .../hadoop/ozone/client/TestOzoneClient.java       |  20 ++-
 .../hadoop/ozone/client/TestOzoneECClient.java     |  86 ++++++++++-
 7 files changed, 271 insertions(+), 70 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 2d151ed..ce12136 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -191,6 +191,10 @@ public class BlockOutputStreamEntry extends OutputStream {
 
   }
 
+  boolean isInitialized() {
+    return outputStream != null;
+  }
+
   /**
    * Builder class for ChunkGroupOutputStreamEntry.
    * */
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 0af651c..5650006 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -299,8 +299,8 @@ public class BlockOutputStreamEntryPool {
     this.currentStreamIndex = currIdx;
   }
 
-  public void updateToNextStream(int rotation){
-    currentStreamIndex = (currentStreamIndex+1) % rotation;
+  public void updateToNextStream(int rotation) {
+    currentStreamIndex = (currentStreamIndex + 1) % rotation;
   }
 
   BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException {
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 383ed17..3016060 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -32,14 +32,16 @@ import java.io.IOException;
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
+  private final boolean isParityStreamEntry;
   private ECBlockOutputStream out;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long 
length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config) {
+      OzoneClientConfig config, boolean isParityStream) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
+    this.isParityStreamEntry = isParityStream;
   }
 
   @Override
@@ -53,6 +55,10 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
     this.out.executePutBlock(false, true);
   }
 
+  public boolean isParityStreamEntry() {
+    return this.isParityStreamEntry;
+  }
+
   /**
    * Builder class for ChunkGroupOutputStreamEntry.
    * */
@@ -66,6 +72,7 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
     private BufferPool bufferPool;
     private Token<OzoneBlockTokenIdentifier> token;
     private OzoneClientConfig config;
+    private boolean isParityStreamEntry;
 
     public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
       this.blockID = bID;
@@ -113,6 +120,12 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
       return this;
     }
 
+    public ECBlockOutputStreamEntry.Builder setIsParityStreamEntry(
+        boolean isParity) {
+      this.isParityStreamEntry = isParity;
+      return this;
+    }
+
     public ECBlockOutputStreamEntry build() {
       return new ECBlockOutputStreamEntry(blockID,
           key,
@@ -120,7 +133,7 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry{
           pipeline,
           length,
           bufferPool,
-          token, config);
+          token, config, isParityStreamEntry);
     }
   }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index ca76b75..3fd6bc6 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -84,7 +84,8 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
               .setXceiverClientManager(getXceiverClientFactory())
               .setPipeline(pipeline).setConfig(getConfig())
               .setLength(subKeyInfo.getLength()).setBufferPool(getBufferPool())
-              .setToken(subKeyInfo.getToken());
+              .setToken(subKeyInfo.getToken())
+              .setIsParityStreamEntry(i >= ecReplicationConfig.getData());
       getStreamEntries().add(builder.build());
     }
   }
@@ -100,17 +101,15 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
     return locationInfoList;
   }
 
+  @Override
   long getKeyLength() {
-    long totalLength = getStreamEntries().stream().filter(c -> {
-      return (c.getPipeline().getReplicaIndex(
-          c.getPipeline().getNodes().iterator()
-              .next())) <= ecReplicationConfig.getData();
-    }).mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
-    totalLength += finishedStreamEntries.stream().filter(c -> {
-      return (c.getPipeline().getReplicaIndex(
-          c.getPipeline().getNodes().iterator()
-              .next())) <= ecReplicationConfig.getData();
-    }).mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
+    long totalLength = getStreamEntries().stream()
+        .filter(c -> !((ECBlockOutputStreamEntry) c).isParityStreamEntry())
+        .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
+
+    totalLength += finishedStreamEntries.stream()
+        .filter(c -> !((ECBlockOutputStreamEntry) c).isParityStreamEntry())
+        .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
     return totalLength;
   }
 
@@ -130,8 +129,24 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
 
   void executePutBlockForAll() throws IOException {
     List<BlockOutputStreamEntry> streamEntries = getStreamEntries();
+    int failedStreams = 0;
     for (int i = 0; i < streamEntries.size(); i++) {
-      ((ECBlockOutputStreamEntry) streamEntries.get(i)).executePutBlock();
+      ECBlockOutputStreamEntry ecBlockOutputStreamEntry =
+          (ECBlockOutputStreamEntry) streamEntries.get(i);
+      if (!ecBlockOutputStreamEntry.isClosed()) {
+        if(!ecBlockOutputStreamEntry.isInitialized()){
+          // Stream not initialized. Means this stream was not used to write.
+          continue;
+        }
+        ecBlockOutputStreamEntry.executePutBlock();
+      }else{
+        failedStreams++;
+      }
+    }
+    if(failedStreams > ecReplicationConfig.getParity()) {
+      throw new IOException(
+          "There are " + failedStreams + " failures than supported tolerance: "
+              + ecReplicationConfig.getParity());
     }
   }
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 474dcab..2f1f815 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.io;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
@@ -192,74 +193,102 @@ public class ECKeyOutputStream extends KeyOutputStream {
     int maxLenToCurrChunkBuffer = (int) Math.min(len, ecChunkSize);
     int currentWriterChunkLenToWrite =
         Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
-    handleWrite(b, off, currentWriterChunkLenToWrite,
-        currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize,
-        false);
-    checkAndWriteParityCells();
+    int pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+        currentWriterChunkLenToWrite,
+        currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize);
+    checkAndWriteParityCells(pos);
 
     int remLen = len - currentWriterChunkLenToWrite;
     int iters = remLen / ecChunkSize;
     int lastCellSize = remLen % ecChunkSize;
     while (iters > 0) {
-      handleWrite(b, off, ecChunkSize, true, false);
+      pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+          ecChunkSize, true);
       off += ecChunkSize;
       iters--;
-      checkAndWriteParityCells();
+      checkAndWriteParityCells(pos);
     }
 
     if (lastCellSize > 0) {
-      handleWrite(b, off, lastCellSize, false, false);
-      checkAndWriteParityCells();
+      pos = handleDataWrite(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+          lastCellSize, false);
+      checkAndWriteParityCells(pos);
     }
     writeOffset += len;
   }
 
-  private void checkAndWriteParityCells() throws IOException {
+  private void checkAndWriteParityCells(int lastDataBuffPos)
+      throws IOException {
     //check data blocks finished
     //If index is > datanum blks
-    if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+    if (blockOutputStreamEntryPool
+        .getCurrIdx() == numDataBlks && lastDataBuffPos == ecChunkSize) {
       //Lets encode and write
-      //encoder.encode();
-      writeParityCells();
-      // By this time, we should have finished full stripe. So, lets call
-      // executePutBlock for all.
-      // TODO: we should alter the put block calls to share CRC to each stream.
-      blockOutputStreamEntryPool.executePutBlockForAll();
-      ecChunkBufferCache.clear();
-
-      // check if block ends?
-      if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
-          .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
-          .getLength()) {
-        blockOutputStreamEntryPool.endECBlock();
-        currentBlockGroupLen = 0;
-      }
+      handleParityWrites(ecChunkSize);
     }
   }
 
-  void writeParityCells() throws IOException {
-    final ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
-    //encode the data cells
-    for (int i = 0; i < numDataBlks; i++) {
-      buffers[i].flip();
+  private void handleParityWrites(int parityCellSize) throws IOException {
+    writeParityCells(parityCellSize);
+    // By this time, we should have finished full stripe. So, lets call
+    // executePutBlock for all.
+    // TODO: we should alter the put block calls to share CRC to each stream.
+    blockOutputStreamEntryPool.executePutBlockForAll();
+    ecChunkBufferCache.clear();
+
+    // check if block ends?
+    if (shouldEndBlockGroup()) {
+      blockOutputStreamEntryPool.endECBlock();
+      currentBlockGroupLen = 0;
     }
+  }
+
+  private boolean shouldEndBlockGroup() {
+    return currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+        .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+        .getLength();
+  }
 
+  void writeParityCells(int parityCellSize) throws IOException {
+    final ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
+    ecChunkBufferCache.allocateParityBuffers(parityCellSize);
     final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
+
+    for(int i=0; i< buffers.length; i++){
+      buffers[i].flip();
+    }
     encoder.encode(buffers, parityBuffers);
     for (int i =
          numDataBlks; i < (this.numDataBlks + this.numParityBlks); i++) {
-      handleWrite(parityBuffers[i - numDataBlks].array(), 0, ecChunkSize, true,
-          true);
+      // Move the stream entry cursor to parity block index
+      blockOutputStreamEntryPool.setCurrIdx(i);
+      handleParityWrite(i, parityBuffers[i - numDataBlks].array(), 0,
+          ecChunkSize, true);
     }
   }
 
-  private void handleWrite(byte[] b, int off, long len, boolean isFullCell,
-      boolean isParity) throws IOException {
-    if (!isParity) {
-      ecChunkBufferCache
-          .addToDataBuffer(blockOutputStreamEntryPool.getCurrIdx(), b, off,
-              (int) len);
+  private int handleDataWrite(int currIdx, byte[] b, int off, long len,
+      boolean isFullCell) throws IOException {
+    int pos = ecChunkBufferCache.addToDataBuffer(currIdx, b, off, (int) len);
+    handleOutputStreamWrite(currIdx, b, off, len, isFullCell, false);
+
+    if(pos == ecChunkSize){
+      blockOutputStreamEntryPool
+          .updateToNextStream(numDataBlks + numParityBlks);
     }
+    return pos;
+  }
+
+  private void handleParityWrite(int currIdx, byte[] b, int off, long len,
+      boolean isFullCell) throws IOException {
+    handleOutputStreamWrite(currIdx, b, off, len, isFullCell, true);
+    blockOutputStreamEntryPool
+        .updateToNextStream(numDataBlks + numParityBlks);
+  }
+
+  private void handleOutputStreamWrite(int currIdx, byte[] b, int off, long 
len,
+      boolean isFullCell, boolean isParity) throws IOException {
+
     BlockOutputStreamEntry current =
         blockOutputStreamEntryPool.allocateBlockIfNeeded();
     int writeLengthToCurrStream =
@@ -273,10 +302,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
     len -= writeLengthToCurrStream;
     if (isFullCell) {
       ByteBuffer bytesToWrite = isParity ?
-          ecChunkBufferCache.getParityBuffers()[blockOutputStreamEntryPool
-              .getCurrIdx() - numDataBlks] :
-          ecChunkBufferCache.getDataBuffers()[blockOutputStreamEntryPool
-              .getCurrIdx()];
+          ecChunkBufferCache.getParityBuffers()[currIdx - numDataBlks] :
+          ecChunkBufferCache.getDataBuffers()[currIdx];
       try {
         writeToOutputStream(current, len, bytesToWrite.array(),
             bytesToWrite.array().length, 0, current.getWrittenDataLength(),
@@ -284,9 +311,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
       } catch (Exception e) {
         markStreamClosed();
       }
-
-      blockOutputStreamEntryPool
-          .updateToNextStream(numDataBlks + numParityBlks);
     }
   }
 
@@ -462,6 +486,33 @@ public class ECKeyOutputStream extends KeyOutputStream {
     closed = true;
     try {
       handleFlushOrCloseAllStreams(StreamAction.CLOSE);
+      if(isPartialStripe()){
+        ByteBuffer bytesToWrite =
+            ecChunkBufferCache.getDataBuffers()[blockOutputStreamEntryPool
+                .getCurrIdx()];
+
+        // Finish writing the current partial cached chunk
+        if (bytesToWrite.position() % ecChunkSize != 0) {
+          final BlockOutputStreamEntry current =
+              blockOutputStreamEntryPool.getCurrentStreamEntry();
+          try {
+            byte[] array = bytesToWrite.array();
+            writeToOutputStream(current, bytesToWrite.position(), array,
+                bytesToWrite.position(), 0, current.getWrittenDataLength(),
+                false);
+          } catch (Exception e) {
+            markStreamClosed();
+          }
+        }
+        final int lastStripeSize =
+            (int) (currentBlockGroupLen % (numDataBlks * ecChunkSize));
+
+        final int parityCellSize =
+            lastStripeSize < ecChunkSize ? lastStripeSize : ecChunkSize;
+        addPadding(parityCellSize);
+        handleParityWrites(parityCellSize);
+      }
+
       if (!isException) {
         Preconditions.checkArgument(writeOffset == offset);
       }
@@ -473,6 +524,31 @@ public class ECKeyOutputStream extends KeyOutputStream {
     ecChunkBufferCache.release();
   }
 
+  private void addPadding(int parityCellSize) {
+    ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
+
+    for (int i = 1; i < numDataBlks; i++) {
+      final int position = buffers[i].position();
+      assert position <= parityCellSize : "If an internal block is smaller"
+          + " than parity block, then its last cell should be small than last"
+          + " parity cell";
+      padBufferToLimit(buffers[i], parityCellSize);
+    }
+  }
+
+  public static void padBufferToLimit(ByteBuffer buf, int limit) {
+    int pos = buf.position();
+    if (pos >= limit) {
+      return;
+    }
+    Arrays.fill(buf.array(), pos, limit, (byte)0);
+    buf.position(limit);
+  }
+
+  private boolean isPartialStripe() {
+    return currentBlockGroupLen % (numDataBlks * ecChunkSize) > 0;
+  }
+
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
     return blockOutputStreamEntryPool.getCommitUploadPartInfo();
   }
@@ -593,7 +669,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
       dataBuffers = new ByteBuffer[this.dataBlks];
       parityBuffers = new ByteBuffer[this.parityBlks];
       allocateBuffers(cellSize, dataBuffers);
-      allocateBuffers(cellSize, parityBuffers);
     }
 
     private ByteBuffer[] getDataBuffers() {
@@ -604,6 +679,10 @@ public class ECKeyOutputStream extends KeyOutputStream {
       return parityBuffers;
     }
 
+    public void allocateParityBuffers(int size){
+      allocateBuffers(size, parityBuffers);
+    }
+
     private int addToDataBuffer(int i, byte[] b, int off, int len) {
       final ByteBuffer buf = dataBuffers[i];
       final int pos = buf.position() + len;
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
index e43401f..804831a 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
@@ -72,20 +72,23 @@ public class TestOzoneClient {
   @Before
   public void init() throws IOException {
     ConfigurationSource config = new InMemoryConfiguration();
+    createNewClient(config, new SinglePipelineBlockAllocator());
+  }
+
+  private void createNewClient(ConfigurationSource config,
+      MockBlockAllocator blkAllocator) throws IOException {
     client = new OzoneClient(config, new RpcClient(config, null) {
 
       @Override
-      protected OmTransport createOmTransport(
-          String omServiceId)
+      protected OmTransport createOmTransport(String omServiceId)
           throws IOException {
-        return new MockOmTransport();
+        return new MockOmTransport(blkAllocator);
       }
 
       @NotNull
       @Override
       protected XceiverClientFactory createXceiverClientFactory(
-          List<X509Certificate> x509Certificates)
-          throws IOException {
+          List<X509Certificate> x509Certificates) throws IOException {
         return new MockXceiverClientFactory();
       }
     });
@@ -186,6 +189,11 @@ public class TestOzoneClient {
 
   @Test
   public void testPutKeyWithECReplicationConfig() throws IOException {
+    close();
+    ConfigurationSource config = new InMemoryConfiguration();
+    int data = 3;
+    int parity = 2;
+    createNewClient(config, new MultiNodePipelineBlockAllocator(data + 
parity));
     String value = new String(new byte[1024], UTF_8);
     OzoneBucket bucket = getOzoneBucket();
 
@@ -193,7 +201,7 @@ public class TestOzoneClient {
       String keyName = UUID.randomUUID().toString();
       try (OzoneOutputStream out = bucket
           .createKey(keyName, value.getBytes(UTF_8).length,
-              new ECReplicationConfig(3, 2), new HashMap<>())) {
+              new ECReplicationConfig(data, parity), new HashMap<>())) {
         out.write(value.getBytes(UTF_8));
         out.write(value.getBytes(UTF_8));
       }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 70d4406..9591e27 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -69,6 +69,8 @@ public class TestOzoneECClient {
   private OzoneClient client;
   private ObjectStore store;
   private String keyName = UUID.randomUUID().toString();
+  private String volumeName = UUID.randomUUID().toString();
+  private String bucketName = UUID.randomUUID().toString();
   private byte[][] inputChunks = new byte[dataBlocks][chunkSize];
   private final XceiverClientFactory factoryStub =
       new MockXceiverClientFactory();
@@ -225,11 +227,91 @@ public class TestOzoneECClient {
     }
   }
 
+  @Test
+  public void testPartialStripeWithSingleChunkAndPadding() throws IOException {
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+        new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
+      for (int i = 0; i < inputChunks[0].length; i++) {
+        out.write(inputChunks[0][i]);
+      }
+    }
+
+    OzoneKey key = bucket.getKey(keyName);
+    Assert.assertEquals(keyName, key.getName());
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[1024];
+      Assert.assertEquals(inputChunks[0].length, is.read(fileContent));
+      Assert.assertEquals(new String(inputChunks[0], UTF_8),
+          new String(fileContent, UTF_8));
+    }
+  }
+
+  @Test
+  public void testPartialStripeLessThanSingleChunkWithPadding()
+      throws IOException {
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+        new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
+      for (int i = 0; i < inputChunks[0].length-1; i++) {
+        out.write(inputChunks[0][i]);
+      }
+    }
+
+    OzoneKey key = bucket.getKey(keyName);
+    Assert.assertEquals(keyName, key.getName());
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[1023];
+      Assert.assertEquals(inputChunks[0].length - 1, is.read(fileContent));
+      Assert.assertEquals(
+          new String(Arrays.copyOf(inputChunks[0], inputChunks[0].length - 1),
+              UTF_8), new String(fileContent, UTF_8));
+    }
+  }
+
+  @Test
+  public void testPartialStripeWithPartialLastChunk()
+      throws IOException {
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    byte[] lastChunk = inputChunks[inputChunks.length - 1];
+
+    try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
+        new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
+      for (int i = 0; i < inputChunks.length - 1; i++) {
+        out.write(inputChunks[i]);
+      }
+
+      for (int i = 0; i < lastChunk.length - 1; i++) {
+        out.write(lastChunk[i]);
+      }
+    }
+
+    // Making sure to keep only the 3rd node in pipeline, so that 3rd chunk can
+    // be read.
+    updatePipelineToKeepSingleNode(3);
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[1023];
+      Assert.assertEquals(lastChunk.length - 1, is.read(fileContent));
+      Assert.assertEquals(
+          new String(Arrays.copyOf(lastChunk, lastChunk.length - 1), UTF_8),
+          new String(fileContent, UTF_8));
+    }
+  }
 
   private OzoneBucket writeIntoECKey(byte[][] chunks, String key,
       DefaultReplicationConfig defaultReplicationConfig) throws IOException {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     if (defaultReplicationConfig != null) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to