This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 3eb55db HDDS-5949. EC: Create reusable buffer pool shared by all EC
input and output streams (#2929)
3eb55db is described below
commit 3eb55db0148f06686abbde49629ff21bceb6d2a9
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Mon Jan 10 21:13:39 2022 +0000
HDDS-5949. EC: Create reusable buffer pool shared by all EC input and
output streams (#2929)
---
.../client/io/BlockInputStreamFactoryImpl.java | 13 ++++-
.../client/io/ECBlockInputStreamFactoryImpl.java | 15 +++--
.../client/io/ECBlockReconstructedInputStream.java | 15 ++++-
.../io/ECBlockReconstructedStripeInputStream.java | 20 ++++++-
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 64 +++++++++++++---------
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 26 ++++++---
.../read/TestECBlockReconstructedInputStream.java | 26 ++++++---
.../TestECBlockReconstructedStripeInputStream.java | 58 ++++++++------------
8 files changed, 148 insertions(+), 89 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 6cdb991..1d46acd 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token;
@@ -38,13 +40,18 @@ public class BlockInputStreamFactoryImpl implements
BlockInputStreamFactory {
private ECBlockInputStreamFactory ecBlockStreamFactory;
- public static BlockInputStreamFactory getInstance() {
- return new BlockInputStreamFactoryImpl();
+ public static BlockInputStreamFactory getInstance(
+ ByteBufferPool byteBufferPool) {
+ return new BlockInputStreamFactoryImpl(byteBufferPool);
}
public BlockInputStreamFactoryImpl() {
+ this(new ElasticByteBufferPool());
+ }
+
+ public BlockInputStreamFactoryImpl(ByteBufferPool byteBufferPool) {
this.ecBlockStreamFactory =
- ECBlockInputStreamFactoryImpl.getInstance(this);
+ ECBlockInputStreamFactoryImpl.getInstance(this, byteBufferPool);
}
/**
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index 75956c4..cc03e80 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import java.util.List;
@@ -36,13 +37,16 @@ public final class ECBlockInputStreamFactoryImpl implements
ECBlockInputStreamFactory {
private final BlockInputStreamFactory inputStreamFactory;
+ private final ByteBufferPool byteBufferPool;
public static ECBlockInputStreamFactory getInstance(
- BlockInputStreamFactory streamFactory) {
- return new ECBlockInputStreamFactoryImpl(streamFactory);
+ BlockInputStreamFactory streamFactory, ByteBufferPool byteBufferPool) {
+ return new ECBlockInputStreamFactoryImpl(streamFactory, byteBufferPool);
}
- private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory)
{
+ private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory,
+ ByteBufferPool byteBufferPool) {
+ this.byteBufferPool = byteBufferPool;
this.inputStreamFactory = streamFactory;
}
@@ -72,12 +76,13 @@ public final class ECBlockInputStreamFactoryImpl implements
ECBlockReconstructedStripeInputStream sis =
new ECBlockReconstructedStripeInputStream(
(ECReplicationConfig)repConfig, blockInfo, verifyChecksum,
- xceiverFactory, refreshFunction, inputStreamFactory);
+ xceiverFactory, refreshFunction, inputStreamFactory,
+ byteBufferPool);
if (failedLocations != null) {
sis.addFailedDatanodes(failedLocations);
}
return new ECBlockReconstructedInputStream(
- (ECReplicationConfig) repConfig, sis);
+ (ECReplicationConfig) repConfig, byteBufferPool, sis);
} else {
// Otherwise create the more efficient non-reconstruction reader
return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
index 6538458..d44ed15 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.io.ByteBufferPool;
import java.io.EOFException;
import java.io.IOException;
@@ -35,13 +36,16 @@ public class ECBlockReconstructedInputStream extends
BlockExtendedInputStream {
private ECReplicationConfig repConfig;
private ECBlockReconstructedStripeInputStream stripeReader;
private ByteBuffer[] bufs;
+ private final ByteBufferPool byteBufferPool;
private boolean closed = false;
private long position = 0;
public ECBlockReconstructedInputStream(ECReplicationConfig repConfig,
+ ByteBufferPool byteBufferPool,
ECBlockReconstructedStripeInputStream stripeReader) {
this.repConfig = repConfig;
+ this.byteBufferPool = byteBufferPool;
this.stripeReader = stripeReader;
allocateBuffers();
@@ -138,6 +142,10 @@ public class ECBlockReconstructedInputStream extends
BlockExtendedInputStream {
@Override
public synchronized void close() throws IOException {
stripeReader.close();
+ for (int i = 0; i < bufs.length; i++) {
+ byteBufferPool.putBuffer(bufs[i]);
+ bufs[i] = null;
+ }
closed = true;
}
@@ -187,7 +195,7 @@ public class ECBlockReconstructedInputStream extends
BlockExtendedInputStream {
private void allocateBuffers() {
bufs = new ByteBuffer[repConfig.getData()];
for (int i = 0; i < repConfig.getData(); i++) {
- bufs[i] = ByteBuffer.allocate(repConfig.getEcChunkSize());
+ bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
// Initially set the limit to 0 so there is no remaining space.
bufs[i].limit(0);
}
@@ -196,6 +204,11 @@ public class ECBlockReconstructedInputStream extends
BlockExtendedInputStream {
private void clearBuffers() {
for (ByteBuffer b : bufs) {
b.clear();
+ // As we are getting buffers from a bufferPool, we may get buffers with a
+ // capacity larger than what we asked for. After calling clear(), the
+ // buffer limit will become the capacity so we need to reset it back to
+ // the desired limit.
+ b.limit(repConfig.getEcChunkSize());
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 46dd3e1..e3fa0a7 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.ozone.erasurecode.CodecRegistry;
import org.apache.ozone.erasurecode.rawcoder.RawErasureDecoder;
@@ -108,6 +109,7 @@ public class ECBlockReconstructedStripeInputStream extends
ECBlockInputStream {
private List<Integer> dataIndexes = new ArrayList<>();
// Data Indexes we have tried to read from, and failed for some reason
private Set<Integer> failedDataIndexes = new HashSet<>();
+ private ByteBufferPool byteBufferPool;
private final RawErasureDecoder decoder;
@@ -118,9 +120,11 @@ public class ECBlockReconstructedStripeInputStream extends
ECBlockInputStream {
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
OmKeyLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
- Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
+ Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
+ ByteBufferPool byteBufferPool) {
super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
refreshFunction, streamFactory);
+ this.byteBufferPool = byteBufferPool;
decoder = CodecRegistry.getInstance()
.getCodecFactory(repConfig.getCodec().toString())
@@ -435,7 +439,8 @@ public class ECBlockReconstructedStripeInputStream extends
ECBlockInputStream {
}
private ByteBuffer allocateBuffer(ECReplicationConfig repConfig) {
- ByteBuffer buf = ByteBuffer.allocate(repConfig.getEcChunkSize());
+ ByteBuffer buf = byteBufferPool.getBuffer(
+ false, repConfig.getEcChunkSize());
return buf;
}
@@ -578,6 +583,17 @@ public class ECBlockReconstructedStripeInputStream extends
ECBlockInputStream {
public synchronized void close() {
super.close();
executor.shutdownNow();
+ // Inside this class, we only allocate buffers to read parity into. Data
+ // is reconstructed or read into a set of buffers passed in from the
calling
+ // class. Therefore we only need to ensure we free the parity buffers here.
+ for (int i = getRepConfig().getData();
+ i < getRepConfig().getRequiredNodes(); i++) {
+ ByteBuffer buf = decoderInputBuffers[i];
+ if (buf != null) {
+ byteBufferPool.putBuffer(buf);
+ decoderInputBuffers[i] = null;
+ }
+ }
}
@Override
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 4e4886a..1e99210 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -56,7 +55,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
private int ecChunkSize;
private final int numDataBlks;
private final int numParityBlks;
- private static final ByteBufferPool BUFFER_POOL = new
ElasticByteBufferPool();
+ private final ByteBufferPool bufferPool;
private final RawErasureEncoder encoder;
private enum StripeWriteStatus {
@@ -96,8 +95,9 @@ public class ECKeyOutputStream extends KeyOutputStream {
XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
int chunkSize, String requestId, ECReplicationConfig replicationConfig,
String uploadID, int partNumber, boolean isMultipart,
- boolean unsafeByteBufferConversion) {
+ boolean unsafeByteBufferConversion, ByteBufferPool byteBufferPool) {
this.config = config;
+ this.bufferPool = byteBufferPool;
// For EC, cell/chunk size and buffer size can be same for now.
ecChunkSize = replicationConfig.getEcChunkSize();
this.config.setStreamBufferMaxSize(ecChunkSize);
@@ -105,8 +105,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
this.config.setStreamBufferSize(ecChunkSize);
this.numDataBlks = replicationConfig.getData();
this.numParityBlks = replicationConfig.getParity();
- ecChunkBufferCache =
- new ECChunkBuffers(ecChunkSize, numDataBlks, numParityBlks);
+ ecChunkBufferCache = new ECChunkBuffers(
+ ecChunkSize, numDataBlks, numParityBlks, bufferPool);
OmKeyInfo info = handler.getKeyInfo();
blockOutputStreamEntryPool =
new ECBlockOutputStreamEntryPool(config, omClient, requestId,
@@ -209,7 +209,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
failedDataStripeChunkLens[i] = dataBuffers[i].limit();
}
final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
- for (int i = 0; i < numParityBlks; i++) {
+ for (int i = 0; i < numParityBlks; i++) {
failedParityStripeChunkLens[i] = parityBuffers[i].limit();
}
@@ -259,7 +259,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
blockOutputStreamEntryPool.getCurrentStreamEntry();
newBlockGroupStreamEntry
.updateBlockGroupToAckedPosition(failedStripeDataSize);
- ecChunkBufferCache.clear(chunkSize);
+ ecChunkBufferCache.clear();
if (newBlockGroupStreamEntry.getRemaining() <= 0) {
// In most cases this should not happen except in the case stripe size
and
@@ -316,7 +316,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
if (hasPutBlockFailure()) {
return StripeWriteStatus.FAILED;
}
- ecChunkBufferCache.clear(parityCellSize);
+ ecChunkBufferCache.clear();
if (streamEntry.getRemaining() <= 0) {
streamEntry.close();
@@ -364,11 +364,13 @@ public class ECKeyOutputStream extends KeyOutputStream {
void writeParityCells(int parityCellSize) throws IOException {
final ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
- ecChunkBufferCache.allocateParityBuffers(parityCellSize);
final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
- for(int i=0; i< buffers.length; i++){
- buffers[i].flip();
+ for (ByteBuffer b : parityBuffers) {
+ b.limit(parityCellSize);
+ }
+ for (ByteBuffer b : buffers) {
+ b.flip();
}
encoder.encode(buffers, parityBuffers);
blockOutputStreamEntryPool
@@ -613,6 +615,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
private boolean unsafeByteBufferConversion;
private OzoneClientConfig clientConfig;
private ECReplicationConfig replicationConfig;
+ private ByteBufferPool byteBufferPool;
public Builder setMultipartUploadID(String uploadID) {
this.multipartUploadID = uploadID;
@@ -670,10 +673,17 @@ public class ECKeyOutputStream extends KeyOutputStream {
return this;
}
+ public ECKeyOutputStream.Builder setByteBufferPool(
+ ByteBufferPool bufferPool) {
+ this.byteBufferPool = bufferPool;
+ return this;
+ }
+
public ECKeyOutputStream build() {
return new ECKeyOutputStream(clientConfig, openHandler, xceiverManager,
omClient, chunkSize, requestID, replicationConfig, multipartUploadID,
- multipartNumber, isMultipartKey, unsafeByteBufferConversion);
+ multipartNumber, isMultipartKey, unsafeByteBufferConversion,
+ byteBufferPool);
}
}
@@ -695,12 +705,16 @@ public class ECKeyOutputStream extends KeyOutputStream {
private final ByteBuffer[] dataBuffers;
private final ByteBuffer[] parityBuffers;
private int cellSize;
+ private ByteBufferPool byteBufferPool;
- ECChunkBuffers(int cellSize, int numData, int numParity) {
+ ECChunkBuffers(int cellSize, int numData, int numParity,
+ ByteBufferPool byteBufferPool) {
this.cellSize = cellSize;
dataBuffers = new ByteBuffer[numData];
parityBuffers = new ByteBuffer[numParity];
- allocateBuffers(cellSize, dataBuffers);
+ this.byteBufferPool = byteBufferPool;
+ allocateBuffers(dataBuffers, this.cellSize);
+ allocateBuffers(parityBuffers, this.cellSize);
}
private ByteBuffer[] getDataBuffers() {
@@ -711,10 +725,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
return parityBuffers;
}
- public void allocateParityBuffers(int size){
- allocateBuffers(size, parityBuffers);
- }
-
private int addToDataBuffer(int i, byte[] b, int off, int len) {
final ByteBuffer buf = dataBuffers[i];
final int pos = buf.position() + len;
@@ -724,9 +734,9 @@ public class ECKeyOutputStream extends KeyOutputStream {
return pos;
}
- private void clear(int size) {
- clearBuffers(size, dataBuffers);
- clearBuffers(size, parityBuffers);
+ private void clear() {
+ clearBuffers(dataBuffers);
+ clearBuffers(parityBuffers);
}
private void release() {
@@ -734,24 +744,24 @@ public class ECKeyOutputStream extends KeyOutputStream {
releaseBuffers(parityBuffers);
}
- private static void allocateBuffers(int cellSize, ByteBuffer[] buffers) {
+ private void allocateBuffers(ByteBuffer[] buffers, int bufferSize) {
for (int i = 0; i < buffers.length; i++) {
- buffers[i] = BUFFER_POOL.getBuffer(false, cellSize);
- buffers[i].limit(cellSize);
+ buffers[i] = byteBufferPool.getBuffer(false, cellSize);
+ buffers[i].limit(bufferSize);
}
}
- private static void clearBuffers(int cellSize, ByteBuffer[] buffers) {
+ private void clearBuffers(ByteBuffer[] buffers) {
for (int i = 0; i < buffers.length; i++) {
buffers[i].clear();
buffers[i].limit(cellSize);
}
}
- private static void releaseBuffers(ByteBuffer[] buffers) {
+ private void releaseBuffers(ByteBuffer[] buffers) {
for (int i = 0; i < buffers.length; i++) {
if (buffers[i] != null) {
- BUFFER_POOL.putBuffer(buffers[i]);
+ byteBufferPool.putBuffer(buffers[i]);
buffers[i] = null;
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 8a5ab08..d043939 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -59,6 +59,8 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -74,6 +76,7 @@ import
org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
@@ -170,6 +173,8 @@ public class RpcClient implements ClientProtocol {
private final OzoneClientConfig clientConfig;
private final Cache<URI, KeyProvider> keyProviderCache;
private final boolean getLatestVersionLocation;
+ private final ByteBufferPool byteBufferPool;
+ private final BlockInputStreamFactory blockInputStreamFactory;
/**
* Creates RpcClient instance with the given configuration.
@@ -282,6 +287,9 @@ public class RpcClient implements ClientProtocol {
}
}
}).build();
+ this.byteBufferPool = new ElasticByteBufferPool();
+ this.blockInputStreamFactory = BlockInputStreamFactoryImpl
+ .getInstance(byteBufferPool);
}
static boolean validateOmVersion(String expectedVersion,
@@ -887,7 +895,7 @@ public class RpcClient implements ClientProtocol {
}
OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
- return createOutputStream(openKey, requestId, replicationConfig);
+ return createOutputStream(openKey, requestId);
}
private KeyProvider.KeyVersion getDEK(FileEncryptionInfo feInfo)
@@ -1348,8 +1356,7 @@ public class RpcClient implements ClientProtocol {
.build();
OpenKeySession keySession =
ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
- return createOutputStream(keySession, UUID.randomUUID().toString(),
- replicationConfig);
+ return createOutputStream(keySession, UUID.randomUUID().toString());
}
@Override
@@ -1429,7 +1436,7 @@ public class RpcClient implements ClientProtocol {
LengthInputStream lengthInputStream = KeyInputStream
.getFromOmKeyInfo(keyInfo, xceiverClientManager,
clientConfig.isChecksumVerify(), retryFunction,
- BlockInputStreamFactoryImpl.getInstance());
+ blockInputStreamFactory);
try {
Map< String, String > keyInfoMetadata = keyInfo.getMetadata();
if (Boolean.valueOf(keyInfoMetadata.get(OzoneConsts.GDPR_FLAG))) {
@@ -1450,7 +1457,7 @@ public class RpcClient implements ClientProtocol {
LengthInputStream lengthInputStream = KeyInputStream
.getFromOmKeyInfo(keyInfo, xceiverClientManager,
clientConfig.isChecksumVerify(), retryFunction,
- BlockInputStreamFactoryImpl.getInstance());
+ blockInputStreamFactory);
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
final CryptoInputStream cryptoIn =
new CryptoInputStream(lengthInputStream.getWrappedStream(),
@@ -1462,7 +1469,7 @@ public class RpcClient implements ClientProtocol {
List<LengthInputStream> lengthInputStreams = KeyInputStream
.getStreamsFromKeyInfo(keyInfo, xceiverClientManager,
clientConfig.isChecksumVerify(), retryFunction,
- BlockInputStreamFactoryImpl.getInstance());
+ blockInputStreamFactory);
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
List<OzoneCryptoInputStream> cryptoInputStreams = new ArrayList<>();
@@ -1479,8 +1486,7 @@ public class RpcClient implements ClientProtocol {
}
private OzoneOutputStream createOutputStream(OpenKeySession openKey,
- String requestId, ReplicationConfig replicationConfig)
- throws IOException {
+ String requestId) throws IOException {
KeyOutputStream keyOutputStream = null;
if (openKey.getKeyInfo().getReplicationConfig()
@@ -1491,7 +1497,9 @@ public class RpcClient implements ClientProtocol {
.setReplicationConfig(
(ECReplicationConfig)openKey.getKeyInfo().getReplicationConfig())
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
- .setConfig(clientConfig).build();
+ .setConfig(clientConfig)
+ .setByteBufferPool(byteBufferPool)
+ .build();
} else {
keyOutputStream = new KeyOutputStream.Builder().setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
index c1c0574..bd5ddd0 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.client.rpc.read;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedInputStream;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -47,6 +49,7 @@ public class TestECBlockReconstructedInputStream {
private long randomSeed;
private ThreadLocalRandom random = ThreadLocalRandom.current();
private SplittableRandom dataGenerator;
+ private ByteBufferPool bufferPool = new ElasticByteBufferPool();
@Before
public void setup() throws IOException {
@@ -63,7 +66,7 @@ public class TestECBlockReconstructedInputStream {
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory);
+ null, null, streamFactory, bufferPool);
}
@Test
@@ -73,7 +76,8 @@ public class TestECBlockReconstructedInputStream {
try(ECBlockReconstructedStripeInputStream stripeStream
= createStripeInputStream(dnMap, 12345L)) {
try (ECBlockReconstructedInputStream stream =
- new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ new ECBlockReconstructedInputStream(repConfig, bufferPool,
+ stripeStream)) {
Assert.assertEquals(12345L, stream.getLength());
}
}
@@ -86,7 +90,8 @@ public class TestECBlockReconstructedInputStream {
try(ECBlockReconstructedStripeInputStream stripeStream
= createStripeInputStream(dnMap, 12345L)) {
try (ECBlockReconstructedInputStream stream =
- new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ new ECBlockReconstructedInputStream(repConfig, bufferPool,
+ stripeStream)) {
Assert.assertEquals(new BlockID(1, 1), stream.getBlockID());
}
}
@@ -110,7 +115,8 @@ public class TestECBlockReconstructedInputStream {
try(ECBlockReconstructedStripeInputStream stripeStream
= createStripeInputStream(dnMap, blockLength)) {
try (ECBlockReconstructedInputStream stream =
- new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ new ECBlockReconstructedInputStream(repConfig, bufferPool,
+ stripeStream)) {
ByteBuffer b = ByteBuffer.allocate(readBufferSize);
int totalRead = 0;
dataGenerator = new SplittableRandom(randomSeed);
@@ -147,7 +153,8 @@ public class TestECBlockReconstructedInputStream {
try(ECBlockReconstructedStripeInputStream stripeStream
= createStripeInputStream(dnMap, blockLength)) {
try (ECBlockReconstructedInputStream stream =
- new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ new ECBlockReconstructedInputStream(repConfig, bufferPool,
+ stripeStream)) {
ByteBuffer b = ByteBuffer.allocate(readBufferSize);
dataGenerator = new SplittableRandom(randomSeed);
long read = stream.read(b);
@@ -178,7 +185,8 @@ public class TestECBlockReconstructedInputStream {
try(ECBlockReconstructedStripeInputStream stripeStream
= createStripeInputStream(dnMap, blockLength)) {
try (ECBlockReconstructedInputStream stream =
- new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ new ECBlockReconstructedInputStream(repConfig, bufferPool,
+ stripeStream)) {
dataGenerator = new SplittableRandom(randomSeed);
int totalRead = 0;
@@ -213,7 +221,8 @@ public class TestECBlockReconstructedInputStream {
try(ECBlockReconstructedStripeInputStream stripeStream
= createStripeInputStream(dnMap, blockLength)) {
try (ECBlockReconstructedInputStream stream =
- new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ new ECBlockReconstructedInputStream(repConfig, bufferPool,
+ stripeStream)) {
int totalRead = 0;
dataGenerator = new SplittableRandom(randomSeed);
while (totalRead < blockLength) {
@@ -249,7 +258,8 @@ public class TestECBlockReconstructedInputStream {
try(ECBlockReconstructedStripeInputStream stripeStream
= createStripeInputStream(dnMap, blockLength)) {
try (ECBlockReconstructedInputStream stream =
- new ECBlockReconstructedInputStream(repConfig, stripeStream)) {
+ new ECBlockReconstructedInputStream(repConfig, bufferPool,
+ stripeStream)) {
ByteBuffer b = ByteBuffer.allocate(readBufferSize);
int seekPosition = 0;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 589ceac..079e944 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.client.rpc.read;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
@@ -52,6 +54,7 @@ public class TestECBlockReconstructedStripeInputStream {
private long randomSeed;
private ThreadLocalRandom random = ThreadLocalRandom.current();
private SplittableRandom dataGen;
+ private ByteBufferPool bufferPool = new ElasticByteBufferPool();
@Before
public void setup() {
@@ -68,28 +71,21 @@ public class TestECBlockReconstructedStripeInputStream {
// One chunk, only 1 location.
OmKeyLocationInfo keyInfo = ECStreamTestUtil
.createKeyInfo(repConfig, 1, ONEMB);
- try (ECBlockInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertTrue(ecb.hasSufficientLocations());
}
// Two Chunks, but missing data block 2.
Map<DatanodeDetails, Integer> dnMap
= ECStreamTestUtil.createIndexMap(1, 4, 5);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 2, dnMap);
- try (ECBlockInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig,
- keyInfo, true, null, null,
- new ECStreamTestUtil.TestBlockInputStreamFactory())) {
+ try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertTrue(ecb.hasSufficientLocations());
}
// Three Chunks, but missing data block 2 and 3.
dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
- try (ECBlockInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertTrue(ecb.hasSufficientLocations());
// Set a failed location
List<DatanodeDetails> failed = new ArrayList<>();
@@ -101,18 +97,14 @@ public class TestECBlockReconstructedStripeInputStream {
// Three Chunks, but missing data block 2 and 3 and parity 1.
dnMap = ECStreamTestUtil.createIndexMap(1, 4);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
- try (ECBlockInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertFalse(ecb.hasSufficientLocations());
}
// Three Chunks, all available but fail 3
dnMap = ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
- try (ECBlockInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertTrue(ecb.hasSufficientLocations());
// Set a failed location
List<DatanodeDetails> failed = new ArrayList<>();
@@ -161,8 +153,7 @@ public class TestECBlockReconstructedStripeInputStream {
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory)) {
+ createInputStream(keyInfo)) {
// Read 3 full stripes
for (int i = 0; i < 3; i++) {
int read = ecb.readStripe(bufs);
@@ -215,8 +206,7 @@ public class TestECBlockReconstructedStripeInputStream {
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory)) {
+ createInputStream(keyInfo)) {
int read = ecb.readStripe(bufs);
Assert.assertEquals(blockLength, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
@@ -258,8 +248,7 @@ public class TestECBlockReconstructedStripeInputStream {
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory)) {
+ createInputStream(keyInfo)) {
int read = ecb.readStripe(bufs);
Assert.assertEquals(blockLength, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
@@ -316,8 +305,7 @@ public class TestECBlockReconstructedStripeInputStream {
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory)) {
+ createInputStream(keyInfo)) {
int read = ecb.readStripe(bufs);
Assert.assertEquals(blockLength, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
@@ -359,8 +347,7 @@ public class TestECBlockReconstructedStripeInputStream {
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
try (ECBlockReconstructedStripeInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
true,
- null, null, streamFactory)) {
+ createInputStream(keyInfo)) {
try {
ecb.readStripe(bufs);
Assert.fail("Read should have thrown an exception");
@@ -404,8 +391,8 @@ public class TestECBlockReconstructedStripeInputStream {
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
try (ECBlockReconstructedStripeInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
- true, null, null, streamFactory)) {
+ createInputStream(keyInfo)) {
+
// Read Stripe 1
int read = ecb.readStripe(bufs);
for (int j = 0; j < bufs.length; j++) {
@@ -457,8 +444,7 @@ public class TestECBlockReconstructedStripeInputStream {
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
try (ECBlockReconstructedStripeInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
- true, null, null, streamFactory)) {
+ createInputStream(keyInfo)) {
try {
ecb.seek(10);
Assert.fail("Seek should have thrown an exception");
@@ -503,8 +489,7 @@ public class TestECBlockReconstructedStripeInputStream {
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
try (ECBlockReconstructedStripeInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory)) {
+ createInputStream(keyInfo)) {
// After reading the first stripe, make one of the streams error
for (int i = 0; i < 3; i++) {
int read = ecb.readStripe(bufs);
@@ -565,8 +550,7 @@ public class TestECBlockReconstructedStripeInputStream {
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
- new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory)) {
+ createInputStream(keyInfo)) {
List<DatanodeDetails> failed = new ArrayList<>();
// Set the first 3 DNs as failed
for (Map.Entry<DatanodeDetails, Integer> e : dnMap.entrySet()) {
@@ -592,6 +576,12 @@ public class TestECBlockReconstructedStripeInputStream {
}
}
+ private ECBlockReconstructedStripeInputStream createInputStream(
+ OmKeyLocationInfo keyInfo) {
+ return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+ null, null, streamFactory, bufferPool);
+ }
+
private List<Integer> indexesToList(int... indexes) {
List<Integer> list = new ArrayList<>();
for (int i : indexes) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]