This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 78ca327  HDDS-6358. EC: Refactor ECKeyOutputStream#write() (#3120)
78ca327 is described below

commit 78ca327e7791accfd2b72375a01b441b593ba376
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Feb 25 05:05:44 2022 +0800

    HDDS-6358. EC: Refactor ECKeyOutputStream#write() (#3120)
---
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 101 ++++++++-------------
 .../hadoop/ozone/client/TestOzoneECClient.java     |   4 +-
 2 files changed, 38 insertions(+), 67 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 4be43e7..1066f0a 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -151,52 +151,34 @@ public class ECKeyOutputStream extends KeyOutputStream {
         || ((off + len) < 0)) {
       throw new IndexOutOfBoundsException();
     }
-    if (len == 0) {
-      return;
-    }
-    blockOutputStreamEntryPool.allocateBlockIfNeeded();
-
-    int currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
-        .getCurrentStreamIdx();
-    int currentChunkBufferRemainingLength =
-        ecChunkBufferCache.dataBuffers[currentStreamIdx].remaining();
-    int currentChunkBufferLen =
-        ecChunkBufferCache.dataBuffers[currentStreamIdx]
-            .position();
-    int maxLenToCurrChunkBuffer = Math.min(len, ecChunkSize);
-    int currentWriterChunkLenToWrite =
-        Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
-    int pos = handleDataWrite(currentStreamIdx, b, off,
-        currentWriterChunkLenToWrite,
-        currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize);
-    checkAndWriteParityCells(pos, false);
-    int remLen = len - currentWriterChunkLenToWrite;
-    int iters = remLen / ecChunkSize;
-    int lastCellSize = remLen % ecChunkSize;
-    off += currentWriterChunkLenToWrite;
-
-    while (iters > 0) {
-      currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
-          .getCurrentStreamIdx();
-      pos = handleDataWrite(currentStreamIdx, b, off, ecChunkSize, true);
-      off += ecChunkSize;
-      iters--;
-      checkAndWriteParityCells(pos, iters > 0 || remLen > 0);
-    }
-
-    if (lastCellSize > 0) {
-      currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
-          .getCurrentStreamIdx();
-      pos = handleDataWrite(currentStreamIdx, b, off,
-          lastCellSize, false);
-      checkAndWriteParityCells(pos, false);
+    int rem = len;
+    while (rem > 0) {
+      try {
+        blockOutputStreamEntryPool.allocateBlockIfNeeded();
+        int currentStreamIdx = blockOutputStreamEntryPool
+            .getCurrentStreamEntry().getCurrentStreamIdx();
+        int bufferRem =
+            ecChunkBufferCache.dataBuffers[currentStreamIdx].remaining();
+        int expectedWriteLen = Math.min(rem, Math.min(bufferRem, ecChunkSize));
+        int oldPos =
+            ecChunkBufferCache.dataBuffers[currentStreamIdx].position();
+        int pos =
+            handleDataWrite(currentStreamIdx, b, off, expectedWriteLen,
+                oldPos + expectedWriteLen == ecChunkSize);
+        checkAndWriteParityCells(pos);
+        long writtenLength = pos - oldPos;
+        rem -= writtenLength;
+        off += writtenLength;
+      } catch (Exception e) {
+        markStreamClosed();
+        throw new IOException(e.getMessage());
+      }
     }
     writeOffset += len;
   }
 
   private StripeWriteStatus rewriteStripeToNewBlockGroup(
-      long failedStripeDataSize, boolean allocateBlockIfFull, boolean close)
-      throws IOException {
+      long failedStripeDataSize, boolean close) throws IOException {
     int[] failedDataStripeChunkLens = new int[numDataBlks];
     int[] failedParityStripeChunkLens = new int[numParityBlks];
     final ByteBuffer[] dataBuffers = ecChunkBufferCache.getDataBuffers();
@@ -259,9 +241,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
       // In most cases this should not happen except in the case stripe size 
and
       // block size same.
       newBlockGroupStreamEntry.close();
-      if (allocateBlockIfFull) {
-        blockOutputStreamEntryPool.allocateBlockIfNeeded();
-      }
     } else {
       newBlockGroupStreamEntry.resetToFirstEntry();
     }
@@ -269,20 +248,19 @@ public class ECKeyOutputStream extends KeyOutputStream {
     return StripeWriteStatus.SUCCESS;
   }
 
-  private void checkAndWriteParityCells(int lastDataBuffPos,
-      boolean allocateBlockIfFull) throws IOException {
-    //check data blocks finished
-    //If index is > datanum blks
+  private void checkAndWriteParityCells(int lastDataBuffPos)
+      throws IOException {
+    // Check data blocks finished
+    // If index > numDataBlks
     ECBlockOutputStreamEntry currentStreamEntry =
         blockOutputStreamEntryPool.getCurrentStreamEntry();
     int currentStreamIdx = currentStreamEntry.getCurrentStreamIdx();
     if (currentStreamIdx == numDataBlks && lastDataBuffPos == ecChunkSize) {
       //Lets encode and write
       boolean shouldClose = currentStreamEntry.getRemaining() <= 0;
-      if (handleParityWrites(ecChunkSize, allocateBlockIfFull,
-          shouldClose) == StripeWriteStatus.FAILED) {
-        handleStripeFailure(numDataBlks * ecChunkSize, allocateBlockIfFull,
-            shouldClose);
+      if (handleParityWrites(ecChunkSize, shouldClose)
+          == StripeWriteStatus.FAILED) {
+        handleStripeFailure(numDataBlks * ecChunkSize, shouldClose);
       } else {
         // At this stage stripe write is successful.
         currentStreamEntry.updateBlockGroupToAckedPosition(
@@ -293,8 +271,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
   }
 
   private StripeWriteStatus handleParityWrites(int parityCellSize,
-      boolean allocateBlockIfFull, boolean isLastStripe)
-      throws IOException {
+      boolean isLastStripe) throws IOException {
     writeParityCells(parityCellSize);
     if (hasWriteFailure()) {
       return StripeWriteStatus.FAILED;
@@ -315,9 +292,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
 
     if (streamEntry.getRemaining() <= 0) {
       streamEntry.close();
-      if (allocateBlockIfFull) {
-        blockOutputStreamEntryPool.allocateBlockIfNeeded();
-      }
     } else {
       streamEntry.resetToFirstEntry();
     }
@@ -522,9 +496,9 @@ public class ECKeyOutputStream extends KeyOutputStream {
         final int parityCellSize =
             (int) (lastStripeSize < ecChunkSize ? lastStripeSize : 
ecChunkSize);
         addPadding(parityCellSize);
-        if (handleParityWrites(parityCellSize,
-            false, true) == StripeWriteStatus.FAILED) {
-          handleStripeFailure(lastStripeSize, false, true);
+        if (handleParityWrites(parityCellSize, true)
+            == StripeWriteStatus.FAILED) {
+          handleStripeFailure(lastStripeSize, true);
         } else {
           blockOutputStreamEntryPool.getCurrentStreamEntry()
               .updateBlockGroupToAckedPosition(
@@ -545,14 +519,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
     ecChunkBufferCache.release();
   }
 
-  private void handleStripeFailure(long lastStripeSize,
-      boolean allocateBlockIfFull, boolean isClose)
+  private void handleStripeFailure(long lastStripeSize, boolean isClose)
       throws IOException {
     StripeWriteStatus stripeWriteStatus;
     for (int i = 0; i < this.config.getMaxECStripeWriteRetries(); i++) {
-      stripeWriteStatus =
-          rewriteStripeToNewBlockGroup(lastStripeSize,
-              allocateBlockIfFull, isClose);
+      stripeWriteStatus = rewriteStripeToNewBlockGroup(lastStripeSize, 
isClose);
       if (stripeWriteStatus == StripeWriteStatus.SUCCESS) {
         return;
       }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index bd9ac20..0ee6808 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -598,9 +598,9 @@ public class TestOzoneECClient {
         .size() == clusterSize);
   }
 
-  @Test(expected = IllegalStateException.class)
   // The mocked impl throws IllegalStateException when there are not enough
-  // nodes in allocateBlock request.
+  // nodes in allocateBlock request. But write() converts it to IOException.
+  @Test(expected = IOException.class)
   public void testStripeWriteRetriesOnAllNodeFailures() throws IOException {
     OzoneConfiguration con = new OzoneConfiguration();
     con.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 2, 
StorageUnit.KB);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to