This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 3eb55db  HDDS-5949. EC: Create reusable buffer pool shared by all EC 
input and output streams (#2929)
3eb55db is described below

commit 3eb55db0148f06686abbde49629ff21bceb6d2a9
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Mon Jan 10 21:13:39 2022 +0000

    HDDS-5949. EC: Create reusable buffer pool shared by all EC input and 
output streams (#2929)
---
 .../client/io/BlockInputStreamFactoryImpl.java     | 13 ++++-
 .../client/io/ECBlockInputStreamFactoryImpl.java   | 15 +++--
 .../client/io/ECBlockReconstructedInputStream.java | 15 ++++-
 .../io/ECBlockReconstructedStripeInputStream.java  | 20 ++++++-
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 64 +++++++++++++---------
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 26 ++++++---
 .../read/TestECBlockReconstructedInputStream.java  | 26 ++++++---
 .../TestECBlockReconstructedStripeInputStream.java | 58 ++++++++------------
 8 files changed, 148 insertions(+), 89 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 6cdb991..1d46acd 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 
@@ -38,13 +40,18 @@ public class BlockInputStreamFactoryImpl implements 
BlockInputStreamFactory {
 
   private ECBlockInputStreamFactory ecBlockStreamFactory;
 
-  public static BlockInputStreamFactory getInstance() {
-    return new BlockInputStreamFactoryImpl();
+  public static BlockInputStreamFactory getInstance(
+      ByteBufferPool byteBufferPool) {
+    return new BlockInputStreamFactoryImpl(byteBufferPool);
   }
 
   public BlockInputStreamFactoryImpl() {
+    this(new ElasticByteBufferPool());
+  }
+
+  public BlockInputStreamFactoryImpl(ByteBufferPool byteBufferPool) {
     this.ecBlockStreamFactory =
-        ECBlockInputStreamFactoryImpl.getInstance(this);
+        ECBlockInputStreamFactoryImpl.getInstance(this, byteBufferPool);
   }
 
   /**
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index 75956c4..cc03e80 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 
 import java.util.List;
@@ -36,13 +37,16 @@ public final class ECBlockInputStreamFactoryImpl implements
     ECBlockInputStreamFactory {
 
   private final BlockInputStreamFactory inputStreamFactory;
+  private final ByteBufferPool byteBufferPool;
 
   public static ECBlockInputStreamFactory getInstance(
-      BlockInputStreamFactory streamFactory) {
-    return new ECBlockInputStreamFactoryImpl(streamFactory);
+      BlockInputStreamFactory streamFactory, ByteBufferPool byteBufferPool) {
+    return new ECBlockInputStreamFactoryImpl(streamFactory, byteBufferPool);
   }
 
-  private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory) 
{
+  private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory,
+      ByteBufferPool byteBufferPool) {
+    this.byteBufferPool = byteBufferPool;
     this.inputStreamFactory = streamFactory;
   }
 
@@ -72,12 +76,13 @@ public final class ECBlockInputStreamFactoryImpl implements
       ECBlockReconstructedStripeInputStream sis =
           new ECBlockReconstructedStripeInputStream(
               (ECReplicationConfig)repConfig, blockInfo, verifyChecksum,
-              xceiverFactory, refreshFunction, inputStreamFactory);
+              xceiverFactory, refreshFunction, inputStreamFactory,
+              byteBufferPool);
       if (failedLocations != null) {
         sis.addFailedDatanodes(failedLocations);
       }
       return new ECBlockReconstructedInputStream(
-          (ECReplicationConfig) repConfig, sis);
+          (ECReplicationConfig) repConfig, byteBufferPool, sis);
     } else {
       // Otherwise create the more efficient non-reconstruction reader
       return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
index 6538458..d44ed15 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.io.ByteBufferPool;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -35,13 +36,16 @@ public class ECBlockReconstructedInputStream extends 
BlockExtendedInputStream {
   private ECReplicationConfig repConfig;
   private ECBlockReconstructedStripeInputStream stripeReader;
   private ByteBuffer[] bufs;
+  private final ByteBufferPool byteBufferPool;
   private boolean closed = false;
 
   private long position = 0;
 
   public ECBlockReconstructedInputStream(ECReplicationConfig repConfig,
+      ByteBufferPool byteBufferPool,
       ECBlockReconstructedStripeInputStream stripeReader) {
     this.repConfig = repConfig;
+    this.byteBufferPool = byteBufferPool;
     this.stripeReader = stripeReader;
 
     allocateBuffers();
@@ -138,6 +142,10 @@ public class ECBlockReconstructedInputStream extends 
BlockExtendedInputStream {
   @Override
   public synchronized void close() throws IOException {
     stripeReader.close();
+    for (int i = 0; i < bufs.length; i++) {
+      byteBufferPool.putBuffer(bufs[i]);
+      bufs[i] = null;
+    }
     closed = true;
   }
 
@@ -187,7 +195,7 @@ public class ECBlockReconstructedInputStream extends 
BlockExtendedInputStream {
   private void allocateBuffers() {
     bufs = new ByteBuffer[repConfig.getData()];
     for (int i = 0; i < repConfig.getData(); i++) {
-      bufs[i] = ByteBuffer.allocate(repConfig.getEcChunkSize());
+      bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
       // Initially set the limit to 0 so there is no remaining space.
       bufs[i].limit(0);
     }
@@ -196,6 +204,11 @@ public class ECBlockReconstructedInputStream extends 
BlockExtendedInputStream {
   private void clearBuffers() {
     for (ByteBuffer b : bufs) {
       b.clear();
+      // As we are getting buffers from a bufferPool, we may get buffers with a
+      // capacity larger than what we asked for. After calling clear(), the
+      // buffer limit will become the capacity so we need to reset it back to
+      // the desired limit.
+      b.limit(repConfig.getEcChunkSize());
     }
   }
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 46dd3e1..e3fa0a7 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.ozone.erasurecode.CodecRegistry;
 import org.apache.ozone.erasurecode.rawcoder.RawErasureDecoder;
@@ -108,6 +109,7 @@ public class ECBlockReconstructedStripeInputStream extends 
ECBlockInputStream {
   private List<Integer> dataIndexes = new ArrayList<>();
   // Data Indexes we have tried to read from, and failed for some reason
   private Set<Integer> failedDataIndexes = new HashSet<>();
+  private ByteBufferPool byteBufferPool;
 
   private final RawErasureDecoder decoder;
 
@@ -118,9 +120,11 @@ public class ECBlockReconstructedStripeInputStream extends 
ECBlockInputStream {
   public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
       OmKeyLocationInfo blockInfo, boolean verifyChecksum,
        XceiverClientFactory xceiverClientFactory, Function<BlockID,
-      Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
+      Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
+      ByteBufferPool byteBufferPool) {
     super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
         refreshFunction, streamFactory);
+    this.byteBufferPool = byteBufferPool;
 
     decoder = CodecRegistry.getInstance()
         .getCodecFactory(repConfig.getCodec().toString())
@@ -435,7 +439,8 @@ public class ECBlockReconstructedStripeInputStream extends 
ECBlockInputStream {
   }
 
   private ByteBuffer allocateBuffer(ECReplicationConfig repConfig) {
-    ByteBuffer buf = ByteBuffer.allocate(repConfig.getEcChunkSize());
+    ByteBuffer buf = byteBufferPool.getBuffer(
+        false, repConfig.getEcChunkSize());
     return buf;
   }
 
@@ -578,6 +583,17 @@ public class ECBlockReconstructedStripeInputStream extends 
ECBlockInputStream {
   public synchronized void close() {
     super.close();
     executor.shutdownNow();
+    // Inside this class, we only allocate buffers to read parity into. Data
+    // is reconstructed or read into a set of buffers passed in from the 
calling
+    // class. Therefore we only need to ensure we free the parity buffers here.
+    for (int i = getRepConfig().getData();
+         i < getRepConfig().getRequiredNodes(); i++) {
+      ByteBuffer buf = decoderInputBuffers[i];
+      if (buf != null) {
+        byteBufferPool.putBuffer(buf);
+        decoderInputBuffers[i] = null;
+      }
+    }
   }
 
   @Override
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 4e4886a..1e99210 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -56,7 +55,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
   private int ecChunkSize;
   private final int numDataBlks;
   private final int numParityBlks;
-  private static final ByteBufferPool BUFFER_POOL = new 
ElasticByteBufferPool();
+  private final ByteBufferPool bufferPool;
   private final RawErasureEncoder encoder;
 
   private enum StripeWriteStatus {
@@ -96,8 +95,9 @@ public class ECKeyOutputStream extends KeyOutputStream {
       XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
       int chunkSize, String requestId, ECReplicationConfig replicationConfig,
       String uploadID, int partNumber, boolean isMultipart,
-      boolean unsafeByteBufferConversion) {
+      boolean unsafeByteBufferConversion, ByteBufferPool byteBufferPool) {
     this.config = config;
+    this.bufferPool = byteBufferPool;
     // For EC, cell/chunk size and buffer size can be same for now.
     ecChunkSize = replicationConfig.getEcChunkSize();
     this.config.setStreamBufferMaxSize(ecChunkSize);
@@ -105,8 +105,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
     this.config.setStreamBufferSize(ecChunkSize);
     this.numDataBlks = replicationConfig.getData();
     this.numParityBlks = replicationConfig.getParity();
-    ecChunkBufferCache =
-        new ECChunkBuffers(ecChunkSize, numDataBlks, numParityBlks);
+    ecChunkBufferCache = new ECChunkBuffers(
+        ecChunkSize, numDataBlks, numParityBlks, bufferPool);
     OmKeyInfo info = handler.getKeyInfo();
     blockOutputStreamEntryPool =
         new ECBlockOutputStreamEntryPool(config, omClient, requestId,
@@ -209,7 +209,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
       failedDataStripeChunkLens[i] = dataBuffers[i].limit();
     }
     final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
-    for (int i = 0; i <  numParityBlks; i++) {
+    for (int i = 0; i < numParityBlks; i++) {
       failedParityStripeChunkLens[i] = parityBuffers[i].limit();
     }
 
@@ -259,7 +259,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
         blockOutputStreamEntryPool.getCurrentStreamEntry();
     newBlockGroupStreamEntry
         .updateBlockGroupToAckedPosition(failedStripeDataSize);
-    ecChunkBufferCache.clear(chunkSize);
+    ecChunkBufferCache.clear();
 
     if (newBlockGroupStreamEntry.getRemaining() <= 0) {
       // In most cases this should not happen except in the case stripe size 
and
@@ -316,7 +316,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
     if (hasPutBlockFailure()) {
       return StripeWriteStatus.FAILED;
     }
-    ecChunkBufferCache.clear(parityCellSize);
+    ecChunkBufferCache.clear();
 
     if (streamEntry.getRemaining() <= 0) {
       streamEntry.close();
@@ -364,11 +364,13 @@ public class ECKeyOutputStream extends KeyOutputStream {
 
   void writeParityCells(int parityCellSize) throws IOException {
     final ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
-    ecChunkBufferCache.allocateParityBuffers(parityCellSize);
     final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
 
-    for(int i=0; i< buffers.length; i++){
-      buffers[i].flip();
+    for (ByteBuffer b : parityBuffers) {
+      b.limit(parityCellSize);
+    }
+    for (ByteBuffer b : buffers) {
+      b.flip();
     }
     encoder.encode(buffers, parityBuffers);
     blockOutputStreamEntryPool
@@ -613,6 +615,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
     private boolean unsafeByteBufferConversion;
     private OzoneClientConfig clientConfig;
     private ECReplicationConfig replicationConfig;
+    private ByteBufferPool byteBufferPool;
 
     public Builder setMultipartUploadID(String uploadID) {
       this.multipartUploadID = uploadID;
@@ -670,10 +673,17 @@ public class ECKeyOutputStream extends KeyOutputStream {
       return this;
     }
 
+    public ECKeyOutputStream.Builder setByteBufferPool(
+        ByteBufferPool bufferPool) {
+      this.byteBufferPool = bufferPool;
+      return this;
+    }
+
     public ECKeyOutputStream build() {
       return new ECKeyOutputStream(clientConfig, openHandler, xceiverManager,
           omClient, chunkSize, requestID, replicationConfig, multipartUploadID,
-          multipartNumber, isMultipartKey, unsafeByteBufferConversion);
+          multipartNumber, isMultipartKey, unsafeByteBufferConversion,
+          byteBufferPool);
     }
   }
 
@@ -695,12 +705,16 @@ public class ECKeyOutputStream extends KeyOutputStream {
     private final ByteBuffer[] dataBuffers;
     private final ByteBuffer[] parityBuffers;
     private int cellSize;
+    private ByteBufferPool byteBufferPool;
 
-    ECChunkBuffers(int cellSize, int numData, int numParity) {
+    ECChunkBuffers(int cellSize, int numData, int numParity,
+        ByteBufferPool byteBufferPool) {
       this.cellSize = cellSize;
       dataBuffers = new ByteBuffer[numData];
       parityBuffers = new ByteBuffer[numParity];
-      allocateBuffers(cellSize, dataBuffers);
+      this.byteBufferPool = byteBufferPool;
+      allocateBuffers(dataBuffers, this.cellSize);
+      allocateBuffers(parityBuffers, this.cellSize);
     }
 
     private ByteBuffer[] getDataBuffers() {
@@ -711,10 +725,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
       return parityBuffers;
     }
 
-    public void allocateParityBuffers(int size){
-      allocateBuffers(size, parityBuffers);
-    }
-
     private int addToDataBuffer(int i, byte[] b, int off, int len) {
       final ByteBuffer buf = dataBuffers[i];
       final int pos = buf.position() + len;
@@ -724,9 +734,9 @@ public class ECKeyOutputStream extends KeyOutputStream {
       return pos;
     }
 
-    private void clear(int size) {
-      clearBuffers(size, dataBuffers);
-      clearBuffers(size, parityBuffers);
+    private void clear() {
+      clearBuffers(dataBuffers);
+      clearBuffers(parityBuffers);
     }
 
     private void release() {
@@ -734,24 +744,24 @@ public class ECKeyOutputStream extends KeyOutputStream {
       releaseBuffers(parityBuffers);
     }
 
-    private static void allocateBuffers(int cellSize, ByteBuffer[] buffers) {
+    private void allocateBuffers(ByteBuffer[] buffers, int bufferSize) {
       for (int i = 0; i < buffers.length; i++) {
-        buffers[i] = BUFFER_POOL.getBuffer(false, cellSize);
-        buffers[i].limit(cellSize);
+        buffers[i] = byteBufferPool.getBuffer(false, cellSize);
+        buffers[i].limit(bufferSize);
       }
     }
 
-    private static void clearBuffers(int cellSize, ByteBuffer[] buffers) {
+    private void clearBuffers(ByteBuffer[] buffers) {
       for (int i = 0; i < buffers.length; i++) {
         buffers[i].clear();
         buffers[i].limit(cellSize);
       }
     }
 
-    private static void releaseBuffers(ByteBuffer[] buffers) {
+    private void releaseBuffers(ByteBuffer[] buffers) {
       for (int i = 0; i < buffers.length; i++) {
         if (buffers[i] != null) {
-          BUFFER_POOL.putBuffer(buffers[i]);
+          byteBufferPool.putBuffer(buffers[i]);
           buffers[i] = null;
         }
       }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 8a5ab08..d043939 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -59,6 +59,8 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -74,6 +76,7 @@ import 
org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
 import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
@@ -170,6 +173,8 @@ public class RpcClient implements ClientProtocol {
   private final OzoneClientConfig clientConfig;
   private final Cache<URI, KeyProvider> keyProviderCache;
   private final boolean getLatestVersionLocation;
+  private final ByteBufferPool byteBufferPool;
+  private final BlockInputStreamFactory blockInputStreamFactory;
 
   /**
    * Creates RpcClient instance with the given configuration.
@@ -282,6 +287,9 @@ public class RpcClient implements ClientProtocol {
             }
           }
         }).build();
+    this.byteBufferPool = new ElasticByteBufferPool();
+    this.blockInputStreamFactory = BlockInputStreamFactoryImpl
+        .getInstance(byteBufferPool);
   }
 
   static boolean validateOmVersion(String expectedVersion,
@@ -887,7 +895,7 @@ public class RpcClient implements ClientProtocol {
     }
 
     OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
-    return createOutputStream(openKey, requestId, replicationConfig);
+    return createOutputStream(openKey, requestId);
   }
 
   private KeyProvider.KeyVersion getDEK(FileEncryptionInfo feInfo)
@@ -1348,8 +1356,7 @@ public class RpcClient implements ClientProtocol {
         .build();
     OpenKeySession keySession =
         ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
-    return createOutputStream(keySession, UUID.randomUUID().toString(),
-        replicationConfig);
+    return createOutputStream(keySession, UUID.randomUUID().toString());
   }
 
   @Override
@@ -1429,7 +1436,7 @@ public class RpcClient implements ClientProtocol {
       LengthInputStream lengthInputStream = KeyInputStream
           .getFromOmKeyInfo(keyInfo, xceiverClientManager,
               clientConfig.isChecksumVerify(), retryFunction,
-              BlockInputStreamFactoryImpl.getInstance());
+              blockInputStreamFactory);
       try {
         Map< String, String > keyInfoMetadata = keyInfo.getMetadata();
         if (Boolean.valueOf(keyInfoMetadata.get(OzoneConsts.GDPR_FLAG))) {
@@ -1450,7 +1457,7 @@ public class RpcClient implements ClientProtocol {
       LengthInputStream lengthInputStream = KeyInputStream
           .getFromOmKeyInfo(keyInfo, xceiverClientManager,
               clientConfig.isChecksumVerify(), retryFunction,
-              BlockInputStreamFactoryImpl.getInstance());
+              blockInputStreamFactory);
       final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
       final CryptoInputStream cryptoIn =
           new CryptoInputStream(lengthInputStream.getWrappedStream(),
@@ -1462,7 +1469,7 @@ public class RpcClient implements ClientProtocol {
       List<LengthInputStream> lengthInputStreams = KeyInputStream
           .getStreamsFromKeyInfo(keyInfo, xceiverClientManager,
               clientConfig.isChecksumVerify(), retryFunction,
-              BlockInputStreamFactoryImpl.getInstance());
+              blockInputStreamFactory);
       final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
 
       List<OzoneCryptoInputStream> cryptoInputStreams = new ArrayList<>();
@@ -1479,8 +1486,7 @@ public class RpcClient implements ClientProtocol {
   }
 
   private OzoneOutputStream createOutputStream(OpenKeySession openKey,
-      String requestId, ReplicationConfig replicationConfig)
-      throws IOException {
+      String requestId) throws IOException {
     KeyOutputStream keyOutputStream = null;
 
     if (openKey.getKeyInfo().getReplicationConfig()
@@ -1491,7 +1497,9 @@ public class RpcClient implements ClientProtocol {
           .setReplicationConfig(
               (ECReplicationConfig)openKey.getKeyInfo().getReplicationConfig())
           .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
-          .setConfig(clientConfig).build();
+          .setConfig(clientConfig)
+          .setByteBufferPool(byteBufferPool)
+          .build();
     } else {
       keyOutputStream = new KeyOutputStream.Builder().setHandler(openKey)
           .setXceiverClientManager(xceiverClientManager)
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
index c1c0574..bd5ddd0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.client.rpc.read;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.ozone.client.io.ECBlockReconstructedInputStream;
 import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -47,6 +49,7 @@ public class TestECBlockReconstructedInputStream {
   private long randomSeed;
   private ThreadLocalRandom random = ThreadLocalRandom.current();
   private SplittableRandom dataGenerator;
+  private ByteBufferPool bufferPool = new ElasticByteBufferPool();
 
   @Before
   public void setup() throws IOException {
@@ -63,7 +66,7 @@ public class TestECBlockReconstructedInputStream {
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
     return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
-        null, null, streamFactory);
+        null, null, streamFactory, bufferPool);
   }
 
   @Test
@@ -73,7 +76,8 @@ public class TestECBlockReconstructedInputStream {
     try(ECBlockReconstructedStripeInputStream stripeStream
         = createStripeInputStream(dnMap, 12345L)) {
       try (ECBlockReconstructedInputStream stream =
-          new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+          new ECBlockReconstructedInputStream(repConfig, bufferPool,
+              stripeStream)) {
         Assert.assertEquals(12345L, stream.getLength());
       }
     }
@@ -86,7 +90,8 @@ public class TestECBlockReconstructedInputStream {
     try(ECBlockReconstructedStripeInputStream stripeStream
             = createStripeInputStream(dnMap, 12345L)) {
       try (ECBlockReconstructedInputStream stream =
-               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+          new ECBlockReconstructedInputStream(repConfig, bufferPool,
+              stripeStream)) {
         Assert.assertEquals(new BlockID(1, 1), stream.getBlockID());
       }
     }
@@ -110,7 +115,8 @@ public class TestECBlockReconstructedInputStream {
     try(ECBlockReconstructedStripeInputStream stripeStream
             = createStripeInputStream(dnMap, blockLength)) {
       try (ECBlockReconstructedInputStream stream =
-               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+          new ECBlockReconstructedInputStream(repConfig, bufferPool,
+              stripeStream)) {
         ByteBuffer b = ByteBuffer.allocate(readBufferSize);
         int totalRead = 0;
         dataGenerator = new SplittableRandom(randomSeed);
@@ -147,7 +153,8 @@ public class TestECBlockReconstructedInputStream {
     try(ECBlockReconstructedStripeInputStream stripeStream
             = createStripeInputStream(dnMap, blockLength)) {
       try (ECBlockReconstructedInputStream stream =
-               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+          new ECBlockReconstructedInputStream(repConfig, bufferPool,
+              stripeStream)) {
         ByteBuffer b = ByteBuffer.allocate(readBufferSize);
         dataGenerator = new SplittableRandom(randomSeed);
         long read = stream.read(b);
@@ -178,7 +185,8 @@ public class TestECBlockReconstructedInputStream {
     try(ECBlockReconstructedStripeInputStream stripeStream
             = createStripeInputStream(dnMap, blockLength)) {
       try (ECBlockReconstructedInputStream stream =
-               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+          new ECBlockReconstructedInputStream(repConfig, bufferPool,
+              stripeStream)) {
 
         dataGenerator = new SplittableRandom(randomSeed);
         int totalRead = 0;
@@ -213,7 +221,8 @@ public class TestECBlockReconstructedInputStream {
     try(ECBlockReconstructedStripeInputStream stripeStream
             = createStripeInputStream(dnMap, blockLength)) {
       try (ECBlockReconstructedInputStream stream =
-               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+          new ECBlockReconstructedInputStream(repConfig, bufferPool,
+              stripeStream)) {
         int totalRead = 0;
         dataGenerator = new SplittableRandom(randomSeed);
         while (totalRead < blockLength) {
@@ -249,7 +258,8 @@ public class TestECBlockReconstructedInputStream {
     try(ECBlockReconstructedStripeInputStream stripeStream
             = createStripeInputStream(dnMap, blockLength)) {
       try (ECBlockReconstructedInputStream stream =
-               new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+          new ECBlockReconstructedInputStream(repConfig, bufferPool,
+              stripeStream)) {
         ByteBuffer b = ByteBuffer.allocate(readBufferSize);
 
         int seekPosition = 0;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 589ceac..079e944 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.client.rpc.read;
 
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
 import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
 import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
@@ -52,6 +54,7 @@ public class TestECBlockReconstructedStripeInputStream {
   private long randomSeed;
   private ThreadLocalRandom random = ThreadLocalRandom.current();
   private SplittableRandom dataGen;
+  private ByteBufferPool bufferPool = new ElasticByteBufferPool();
 
   @Before
   public void setup() {
@@ -68,28 +71,21 @@ public class TestECBlockReconstructedStripeInputStream {
     // One chunk, only 1 location.
     OmKeyLocationInfo keyInfo = ECStreamTestUtil
         .createKeyInfo(repConfig, 1, ONEMB);
-    try (ECBlockInputStream ecb =
-        new ECBlockReconstructedStripeInputStream(repConfig,
-        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+    try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
       Assert.assertTrue(ecb.hasSufficientLocations());
     }
     // Two Chunks, but missing data block 2.
     Map<DatanodeDetails, Integer> dnMap
         = ECStreamTestUtil.createIndexMap(1, 4, 5);
     keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 2, dnMap);
-    try (ECBlockInputStream ecb =
-        new ECBlockReconstructedStripeInputStream(repConfig,
-        keyInfo, true, null, null,
-            new ECStreamTestUtil.TestBlockInputStreamFactory())) {
+    try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
       Assert.assertTrue(ecb.hasSufficientLocations());
     }
 
     // Three Chunks, but missing data block 2 and 3.
     dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
     keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
-    try (ECBlockInputStream ecb =
-        new ECBlockReconstructedStripeInputStream(repConfig,
-        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+    try (ECBlockInputStream ecb =  createInputStream(keyInfo)) {
       Assert.assertTrue(ecb.hasSufficientLocations());
       // Set a failed location
       List<DatanodeDetails> failed = new ArrayList<>();
@@ -101,18 +97,14 @@ public class TestECBlockReconstructedStripeInputStream {
     // Three Chunks, but missing data block 2 and 3 and parity 1.
     dnMap = ECStreamTestUtil.createIndexMap(1, 4);
     keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
-    try (ECBlockInputStream ecb =
-        new ECBlockReconstructedStripeInputStream(repConfig,
-        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+    try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
       Assert.assertFalse(ecb.hasSufficientLocations());
     }
 
     // Three Chunks, all available but fail 3
     dnMap = ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
     keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
-    try (ECBlockInputStream ecb =
-        new ECBlockReconstructedStripeInputStream(repConfig,
-        keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+    try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
       Assert.assertTrue(ecb.hasSufficientLocations());
       // Set a failed location
       List<DatanodeDetails> failed = new ArrayList<>();
@@ -161,8 +153,7 @@ public class TestECBlockReconstructedStripeInputStream {
       ByteBuffer[] bufs = allocateByteBuffers(repConfig);
       dataGen = new SplittableRandom(randomSeed);
       try (ECBlockReconstructedStripeInputStream ecb =
-          new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
-              null, null, streamFactory)) {
+          createInputStream(keyInfo)) {
         // Read 3 full stripes
         for (int i = 0; i < 3; i++) {
           int read = ecb.readStripe(bufs);
@@ -215,8 +206,7 @@ public class TestECBlockReconstructedStripeInputStream {
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
     dataGen = new SplittableRandom(randomSeed);
     try (ECBlockReconstructedStripeInputStream ecb =
-        new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
-            null, null, streamFactory)) {
+        createInputStream(keyInfo)) {
       int read = ecb.readStripe(bufs);
       Assert.assertEquals(blockLength, read);
       ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
@@ -258,8 +248,7 @@ public class TestECBlockReconstructedStripeInputStream {
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
     dataGen = new SplittableRandom(randomSeed);
     try (ECBlockReconstructedStripeInputStream ecb =
-        new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
-            null, null, streamFactory)) {
+        createInputStream(keyInfo)) {
       int read = ecb.readStripe(bufs);
       Assert.assertEquals(blockLength, read);
       ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
@@ -316,8 +305,7 @@ public class TestECBlockReconstructedStripeInputStream {
       streamFactory.setCurrentPipeline(keyInfo.getPipeline());
       dataGen = new SplittableRandom(randomSeed);
       try (ECBlockReconstructedStripeInputStream ecb =
-          new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
-              null, null, streamFactory)) {
+          createInputStream(keyInfo)) {
         int read = ecb.readStripe(bufs);
         Assert.assertEquals(blockLength, read);
         ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
@@ -359,8 +347,7 @@ public class TestECBlockReconstructedStripeInputStream {
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
     try (ECBlockReconstructedStripeInputStream ecb =
-             new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, 
true,
-                 null, null, streamFactory)) {
+        createInputStream(keyInfo)) {
       try {
         ecb.readStripe(bufs);
         Assert.fail("Read should have thrown an exception");
@@ -404,8 +391,8 @@ public class TestECBlockReconstructedStripeInputStream {
 
       ByteBuffer[] bufs = allocateByteBuffers(repConfig);
       try (ECBlockReconstructedStripeInputStream ecb =
-               new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
-                   true, null, null, streamFactory)) {
+          createInputStream(keyInfo)) {
+
         // Read Stripe 1
         int read = ecb.readStripe(bufs);
         for (int j = 0; j < bufs.length; j++) {
@@ -457,8 +444,7 @@ public class TestECBlockReconstructedStripeInputStream {
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
     try (ECBlockReconstructedStripeInputStream ecb =
-             new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
-                 true, null, null, streamFactory)) {
+        createInputStream(keyInfo)) {
       try {
         ecb.seek(10);
         Assert.fail("Seek should have thrown an exception");
@@ -503,8 +489,7 @@ public class TestECBlockReconstructedStripeInputStream {
 
       ByteBuffer[] bufs = allocateByteBuffers(repConfig);
       try (ECBlockReconstructedStripeInputStream ecb =
-          new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
-                   null, null, streamFactory)) {
+          createInputStream(keyInfo)) {
         // After reading the first stripe, make one of the streams error
         for (int i = 0; i < 3; i++) {
           int read = ecb.readStripe(bufs);
@@ -565,8 +550,7 @@ public class TestECBlockReconstructedStripeInputStream {
     ByteBuffer[] bufs = allocateByteBuffers(repConfig);
     dataGen = new SplittableRandom(randomSeed);
     try (ECBlockReconstructedStripeInputStream ecb =
-        new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
-            null, null, streamFactory)) {
+        createInputStream(keyInfo)) {
       List<DatanodeDetails> failed = new ArrayList<>();
       // Set the first 3 DNs as failed
       for (Map.Entry<DatanodeDetails, Integer> e : dnMap.entrySet()) {
@@ -592,6 +576,12 @@ public class TestECBlockReconstructedStripeInputStream {
     }
   }
 
+  private ECBlockReconstructedStripeInputStream createInputStream(
+      OmKeyLocationInfo keyInfo) {
+    return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+        null, null, streamFactory, bufferPool);
+  }
+
   private List<Integer> indexesToList(int... indexes) {
     List<Integer> list = new ArrayList<>();
     for (int i : indexes) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to