This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new b9adbcd  HDDS-6348: EC: PartialStripe failure handling logic is 
writing padding bytes also to DNs (#3124)
b9adbcd is described below

commit b9adbcd89ad84d71e73ddff26f4e039c6aeba499
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Wed Feb 23 21:30:18 2022 -0800

    HDDS-6348: EC: PartialStripe failure handling logic is writing padding 
bytes also to DNs (#3124)
---
 .../client/io/BlockOutputStreamEntryPool.java      |   3 +-
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 101 +++++++++++----------
 .../hadoop/ozone/client/TestOzoneECClient.java     |  66 ++++++++++++++
 3 files changed, 123 insertions(+), 47 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 1c09823..856d486 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -324,7 +324,8 @@ public class BlockOutputStreamEntryPool {
     if (keyArgs != null) {
       // in test, this could be null
       long length = getKeyLength();
-      Preconditions.checkArgument(offset == length);
+      Preconditions.checkArgument(offset == length,
+          "Epected offset: " + offset + " expected len: " + length);
       keyArgs.setDataSize(length);
       keyArgs.setLocationInfoList(getLocationInfoList());
       // When the key is multipart upload part file upload, we should not
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 9ef3d06..266ee90 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -196,10 +196,10 @@ public class ECKeyOutputStream extends KeyOutputStream {
   }
 
   private StripeWriteStatus rewriteStripeToNewBlockGroup(
-      int failedStripeDataSize, boolean allocateBlockIfFull, boolean close)
+      long failedStripeDataSize, boolean allocateBlockIfFull, boolean close)
       throws IOException {
-    long[] failedDataStripeChunkLens = new long[numDataBlks];
-    long[] failedParityStripeChunkLens = new long[numParityBlks];
+    int[] failedDataStripeChunkLens = new int[numDataBlks];
+    int[] failedParityStripeChunkLens = new int[numParityBlks];
     final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers();
     for (int i = 0; i < numDataBlks; i++) {
       failedDataStripeChunkLens[i] = dataBuffers[i].limit();
@@ -227,18 +227,18 @@ public class ECKeyOutputStream extends KeyOutputStream {
         blockOutputStreamEntryPool.getCurrentStreamEntry();
     long totalLenToWrite = failedStripeDataSize;
     for (int i = 0; i < numDataBlks; i++) {
-      long currentLen = totalLenToWrite < failedDataStripeChunkLens[i] ?
+      int currentLen = (int) (totalLenToWrite < failedDataStripeChunkLens[i] ?
           totalLenToWrite :
-          failedDataStripeChunkLens[i];
+          failedDataStripeChunkLens[i]);
       if (currentLen > 0) {
-        handleOutputStreamWrite(i, currentLen, true, false);
+        handleOutputStreamWrite(i, currentLen, false);
       }
       currentStreamEntry.useNextBlockStream();
       totalLenToWrite -= currentLen;
     }
     for (int i = 0; i < (numParityBlks); i++) {
       handleOutputStreamWrite(i + numDataBlks, failedParityStripeChunkLens[i],
-          true, true);
+          true);
       currentStreamEntry.useNextBlockStream();
     }
 
@@ -373,47 +373,51 @@ public class ECKeyOutputStream extends KeyOutputStream {
     for (int i =
          numDataBlks; i < (this.numDataBlks + this.numParityBlks); i++) {
       // Move the stream entry cursor to parity block index
-      handleParityWrite(i, parityCellSize, true);
+      handleParityWrite(i, parityCellSize);
     }
   }
 
   private int handleDataWrite(int currIdx, byte[] b, int off, long len,
-      boolean isFullCell) throws IOException {
+      boolean isFullCell) {
     int pos = ecChunkBufferCache.addToDataBuffer(currIdx, b, off, (int) len);
-    handleOutputStreamWrite(currIdx, len, isFullCell, false);
 
-    if (pos == ecChunkSize) {
+    if (isFullCell) {
+      Preconditions.checkArgument(pos == ecChunkSize,
+          "When full cell passed, the pos: " + pos
+              + " should match to ec chunk size.");
+      handleOutputStreamWrite(currIdx, pos, false);
       blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
     }
     return pos;
   }
 
-  private void handleParityWrite(int currIdx, long len, boolean isFullCell) {
-    handleOutputStreamWrite(currIdx, len, isFullCell, true);
+  private void handleParityWrite(int currIdx, int len) {
+    handleOutputStreamWrite(currIdx, len, true);
     blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
   }
 
-  private void handleOutputStreamWrite(int currIdx, long len,
-      boolean isFullCell, boolean isParity) {
-
-    BlockOutputStreamEntry current =
-        blockOutputStreamEntryPool.getCurrentStreamEntry();
-
-    if (isFullCell) {
-      ByteBuffer bytesToWrite = isParity ?
-          ecChunkBufferCache.getParityBuffers()[currIdx - numDataBlks] :
-          ecChunkBufferCache.getDataBuffers()[currIdx];
-      try {
-        // Since it's a fullcell, let's write all content from buffer.
-        writeToOutputStream(current, len, bytesToWrite.array(),
-            bytesToWrite.limit(), 0, isParity);
-      } catch (Exception e) {
-        markStreamAsFailed(e);
-      }
-    }
-  }
-
-  private int writeToOutputStream(BlockOutputStreamEntry current, long len,
+  private void handleOutputStreamWrite(int currIdx, int len, boolean isParity) 
{
+    ByteBuffer bytesToWrite = isParity ?
+        ecChunkBufferCache.getParityBuffers()[currIdx - numDataBlks] :
+        ecChunkBufferCache.getDataBuffers()[currIdx];
+    try {
+      // Since it's a full cell, let's write all content from buffer.
+      // At a time we write max cell size in EC. So, it should safe to cast
+      // the len to int to use the super class defined write API.
+      // The len cannot be bigger than cell buffer size.
+      assert len <= ecChunkSize : " The len: " + len + ". EC chunk size: "
+          + ecChunkSize;
+      assert len <= bytesToWrite
+          .limit() : " The len: " + len + ". Chunk buffer limit: "
+          + bytesToWrite.limit();
+      writeToOutputStream(blockOutputStreamEntryPool.getCurrentStreamEntry(),
+          bytesToWrite.array(), len, 0, isParity);
+    } catch (Exception e) {
+      markStreamAsFailed(e);
+    }
+  }
+
+  private long writeToOutputStream(ECBlockOutputStreamEntry current,
       byte[] b, int writeLen, int off, boolean isParity)
       throws IOException {
     try {
@@ -424,8 +428,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
       }
       current.write(b, off, writeLen);
     } catch (IOException ioe) {
-      LOG.debug("Exception:: writeLen: " + writeLen + ", total len:" + len,
-          ioe);
+      LOG.debug(
+          "Exception while writing the cell buffers. The writeLen: " + writeLen
+              + ". The block internal index is: "
+              + current
+              .getCurrentStreamIdx(), ioe);
       handleException(current, ioe);
     }
     return writeLen;
@@ -494,7 +501,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
     }
     closed = true;
     try {
-      final int lastStripeSize = getCurrentDataStripeSize();
+      final long lastStripeSize = getCurrentDataStripeSize();
       if (isPartialStripe(lastStripeSize)) {
         ByteBuffer bytesToWrite =
             ecChunkBufferCache.getDataBuffers()[blockOutputStreamEntryPool
@@ -502,11 +509,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
 
         // Finish writing the current partial cached chunk
         if (bytesToWrite.position() % ecChunkSize != 0) {
-          final BlockOutputStreamEntry current =
+          final ECBlockOutputStreamEntry current =
               blockOutputStreamEntryPool.getCurrentStreamEntry();
           try {
             byte[] array = bytesToWrite.array();
-            writeToOutputStream(current, bytesToWrite.position(), array,
+            writeToOutputStream(current, array,
                 bytesToWrite.position(), 0, false);
           } catch (Exception e) {
             markStreamAsFailed(e);
@@ -514,7 +521,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
         }
 
         final int parityCellSize =
-            lastStripeSize < ecChunkSize ? lastStripeSize : ecChunkSize;
+            (int) (lastStripeSize < ecChunkSize ? lastStripeSize : 
ecChunkSize);
         addPadding(parityCellSize);
         if (handleParityWrites(parityCellSize,
             false, true) == StripeWriteStatus.FAILED) {
@@ -529,7 +536,9 @@ public class ECKeyOutputStream extends KeyOutputStream {
       }
 
       closeCurrentStreamEntry();
-      Preconditions.checkArgument(writeOffset == offset);
+      Preconditions.checkArgument(writeOffset == offset,
+          "Expected writeOffset= " + writeOffset
+              + " Expected offset=" + offset);
       blockOutputStreamEntryPool.commitKey(offset);
     } finally {
       blockOutputStreamEntryPool.cleanup();
@@ -537,7 +546,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
     ecChunkBufferCache.release();
   }
 
-  private void handleStripeFailure(int lastStripeSize,
+  private void handleStripeFailure(long lastStripeSize,
       boolean allocateBlockIfFull, boolean isClose)
       throws IOException {
     StripeWriteStatus stripeWriteStatus;
@@ -575,13 +584,13 @@ public class ECKeyOutputStream extends KeyOutputStream {
     buf.position(limit);
   }
 
-  private boolean isPartialStripe(int stripeSize) {
-    return stripeSize > 0 && stripeSize < numDataBlks * ecChunkSize;
+  private boolean isPartialStripe(long stripeSize) {
+    return stripeSize > 0 && stripeSize < (numDataBlks * ecChunkSize);
   }
 
-  private int getCurrentDataStripeSize() {
+  private long getCurrentDataStripeSize() {
     final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers();
-    int lastStripeSize = 0;
+    long lastStripeSize = 0;
     for (int i = 0; i < numDataBlks; i++) {
       lastStripeSize += dataBuffers[i].position();
     }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 8b2a8a1..8a17d81 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -843,6 +843,72 @@ public class TestOzoneECClient {
     }
   }
 
+  @Test
+  public void testPartialStripeWithPartialChunkRetry()
+      throws IOException {
+    close();
+    OzoneConfiguration con = new OzoneConfiguration();
+    // block size of 3KB could hold 3 full stripes
+    con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 3, 
StorageUnit.KB);
+    con.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_EC_STRIPE_WRITE_RETRIES, 3);
+    MultiNodePipelineBlockAllocator blkAllocator =
+        new MultiNodePipelineBlockAllocator(con, dataBlocks + parityBlocks, 
15);
+    createNewClient(con, blkAllocator);
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    int numFullChunks = 7;
+    //Prepare additional partial chunk.
+    int partialChunkSize = 1020;
+    byte[] partialChunk = new byte[partialChunkSize];
+    Arrays.fill(partialChunk, 0, partialChunk.length, "1".getBytes(UTF_8)[0]);
+
+    // A partial chunk to trigger partialStripe check
+    // in ECKeyOutputStream.close()
+    int inSize = chunkSize;
+    try (OzoneOutputStream out = bucket.createKey(keyName, inSize,
+        new ECReplicationConfig(dataBlocks, parityBlocks,
+            ECReplicationConfig.EcCodec.RS, chunkSize), new HashMap<>())) {
+      for (int i = 0; i < numFullChunks; i++) {
+        out.write(inputChunks[i % dataBlocks]);
+      }
+
+      out.write(partialChunk);
+
+      int[] nodesIndexesToMarkFailure = new int[] {0, 4};
+      List<DatanodeDetails> failedDNs = new ArrayList<>();
+      List<HddsProtos.DatanodeDetailsProto> dns = blkAllocator.getClusterDns();
+      for (int j = 0; j < nodesIndexesToMarkFailure.length; j++) {
+        failedDNs.add(DatanodeDetails
+            .getFromProtoBuf(dns.get(nodesIndexesToMarkFailure[j])));
+      }
+
+      // First let's set storage as bad
+      ((MockXceiverClientFactory) factoryStub).setFailedStorages(failedDNs);
+
+    }
+
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[chunkSize];
+      for (int i = 0; i < numFullChunks; i++) {
+        Assert.assertEquals(inputChunks[i % dataBlocks].length,
+            is.read(fileContent));
+        Assert.assertTrue("Expected: " + new String(inputChunks[i % 
dataBlocks],
+                UTF_8) + " \n " + "Actual: " + new String(fileContent, UTF_8),
+            Arrays.equals(inputChunks[i % dataBlocks], fileContent));
+      }
+
+      byte[] partialChunkToRead = new byte[partialChunkSize];
+      Assert
+          .assertEquals(partialChunkToRead.length, 
is.read(partialChunkToRead));
+      Assert.assertTrue(Arrays.equals(partialChunk, partialChunkToRead));
+
+      Assert.assertEquals(-1, is.read(partialChunkToRead));
+    }
+  }
+
   @Test(expected = NotImplementedException.class)
   public void testFlushShouldThrowNotImplementedException() throws IOException 
{
     store.createVolume(volumeName);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to