This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 074203f  HDDS-6437. EC: Avoid allocating buffers in EC Reconstruction 
Streams until first read (#3181)
074203f is described below

commit 074203fc0aa23a7a086a879f5a639bbc087a8f17
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Mon Mar 14 19:43:08 2022 +0000

    HDDS-6437. EC: Avoid allocating buffers in EC Reconstruction Streams until 
first read (#3181)
---
 .../client/io/ECBlockReconstructedInputStream.java | 14 +++++++----
 .../io/ECBlockReconstructedStripeInputStream.java  | 29 +++++++++++++---------
 2 files changed, 26 insertions(+), 17 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
index 6894719..034a3a2 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
@@ -47,8 +47,6 @@ public class ECBlockReconstructedInputStream extends 
BlockExtendedInputStream {
     this.repConfig = repConfig;
     this.byteBufferPool = byteBufferPool;
     this.stripeReader = stripeReader;
-
-    allocateBuffers();
   }
 
   @Override
@@ -75,6 +73,7 @@ public class ECBlockReconstructedInputStream extends 
BlockExtendedInputStream {
   @Override
   public synchronized int read(ByteBuffer buf) throws IOException {
     ensureNotClosed();
+    allocateBuffers();
     if (!hasRemaining()) {
       return EOF;
     }
@@ -142,9 +141,11 @@ public class ECBlockReconstructedInputStream extends 
BlockExtendedInputStream {
   @Override
   public synchronized void close() throws IOException {
     stripeReader.close();
-    for (int i = 0; i < bufs.length; i++) {
-      byteBufferPool.putBuffer(bufs[i]);
-      bufs[i] = null;
+    if (bufs != null) {
+      for (int i = 0; i < bufs.length; i++) {
+        byteBufferPool.putBuffer(bufs[i]);
+        bufs[i] = null;
+      }
     }
     closed = true;
   }
@@ -193,6 +194,9 @@ public class ECBlockReconstructedInputStream extends 
BlockExtendedInputStream {
   }
 
   private void allocateBuffers() {
+    if (bufs != null) {
+      return;
+    }
     bufs = new ByteBuffer[repConfig.getData()];
     for (int i = 0; i < repConfig.getData(); i++) {
       bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 91c3797..2b8d231 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -108,7 +108,7 @@ public class ECBlockReconstructedStripeInputStream extends 
ECBlockInputStream {
   private Set<Integer> failedDataIndexes = new HashSet<>();
   private ByteBufferPool byteBufferPool;
 
-  private final RawErasureDecoder decoder;
+  private RawErasureDecoder decoder;
 
   private boolean initialized = false;
 
@@ -125,11 +125,6 @@ public class ECBlockReconstructedStripeInputStream extends 
ECBlockInputStream {
         refreshFunction, streamFactory);
     this.byteBufferPool = byteBufferPool;
     this.executor = ecReconstructExecutor;
-    decoder = CodecUtil.createRawDecoderWithFallback(repConfig);
-
-    // The EC decoder needs an array data+parity long, with missing or not
-    // needed indexes set to null.
-    decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
   }
 
   /**
@@ -160,6 +155,14 @@ public class ECBlockReconstructedStripeInputStream extends 
ECBlockInputStream {
   }
 
   protected void init() throws InsufficientLocationsException {
+    if (decoder == null) {
+      decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
+    }
+    if (decoderInputBuffers == null) {
+      // The EC decoder needs an array data+parity long, with missing or not
+      // needed indexes set to null.
+      decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
+    }
     if (!hasSufficientLocations()) {
       throw new InsufficientLocationsException("There are insufficient " +
           "datanodes to read the EC block");
@@ -578,12 +581,14 @@ public class ECBlockReconstructedStripeInputStream 
extends ECBlockInputStream {
     // Inside this class, we only allocate buffers to read parity into. Data
     // is reconstructed or read into a set of buffers passed in from the 
calling
     // class. Therefore we only need to ensure we free the parity buffers here.
-    for (int i = getRepConfig().getData();
-         i < getRepConfig().getRequiredNodes(); i++) {
-      ByteBuffer buf = decoderInputBuffers[i];
-      if (buf != null) {
-        byteBufferPool.putBuffer(buf);
-        decoderInputBuffers[i] = null;
+    if (decoderInputBuffers != null) {
+      for (int i = getRepConfig().getData();
+           i < getRepConfig().getRequiredNodes(); i++) {
+        ByteBuffer buf = decoderInputBuffers[i];
+        if (buf != null) {
+          byteBufferPool.putBuffer(buf);
+          decoderInputBuffers[i] = null;
+        }
       }
     }
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to