This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 074203f HDDS-6437. EC: Avoid allocating buffers in EC Reconstruction
Streams until first read (#3181)
074203f is described below
commit 074203fc0aa23a7a086a879f5a639bbc087a8f17
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Mon Mar 14 19:43:08 2022 +0000
HDDS-6437. EC: Avoid allocating buffers in EC Reconstruction Streams until
first read (#3181)
---
.../client/io/ECBlockReconstructedInputStream.java | 14 +++++++----
.../io/ECBlockReconstructedStripeInputStream.java | 29 +++++++++++++---------
2 files changed, 26 insertions(+), 17 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
index 6894719..034a3a2 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
@@ -47,8 +47,6 @@ public class ECBlockReconstructedInputStream extends
BlockExtendedInputStream {
this.repConfig = repConfig;
this.byteBufferPool = byteBufferPool;
this.stripeReader = stripeReader;
-
- allocateBuffers();
}
@Override
@@ -75,6 +73,7 @@ public class ECBlockReconstructedInputStream extends
BlockExtendedInputStream {
@Override
public synchronized int read(ByteBuffer buf) throws IOException {
ensureNotClosed();
+ allocateBuffers();
if (!hasRemaining()) {
return EOF;
}
@@ -142,9 +141,11 @@ public class ECBlockReconstructedInputStream extends
BlockExtendedInputStream {
@Override
public synchronized void close() throws IOException {
stripeReader.close();
- for (int i = 0; i < bufs.length; i++) {
- byteBufferPool.putBuffer(bufs[i]);
- bufs[i] = null;
+ if (bufs != null) {
+ for (int i = 0; i < bufs.length; i++) {
+ byteBufferPool.putBuffer(bufs[i]);
+ bufs[i] = null;
+ }
}
closed = true;
}
@@ -193,6 +194,9 @@ public class ECBlockReconstructedInputStream extends
BlockExtendedInputStream {
}
private void allocateBuffers() {
+ if (bufs != null) {
+ return;
+ }
bufs = new ByteBuffer[repConfig.getData()];
for (int i = 0; i < repConfig.getData(); i++) {
bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 91c3797..2b8d231 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -108,7 +108,7 @@ public class ECBlockReconstructedStripeInputStream extends
ECBlockInputStream {
private Set<Integer> failedDataIndexes = new HashSet<>();
private ByteBufferPool byteBufferPool;
- private final RawErasureDecoder decoder;
+ private RawErasureDecoder decoder;
private boolean initialized = false;
@@ -125,11 +125,6 @@ public class ECBlockReconstructedStripeInputStream extends
ECBlockInputStream {
refreshFunction, streamFactory);
this.byteBufferPool = byteBufferPool;
this.executor = ecReconstructExecutor;
- decoder = CodecUtil.createRawDecoderWithFallback(repConfig);
-
- // The EC decoder needs an array data+parity long, with missing or not
- // needed indexes set to null.
- decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
}
/**
@@ -160,6 +155,14 @@ public class ECBlockReconstructedStripeInputStream extends
ECBlockInputStream {
}
protected void init() throws InsufficientLocationsException {
+ if (decoder == null) {
+ decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
+ }
+ if (decoderInputBuffers == null) {
+ // The EC decoder needs an array data+parity long, with missing or not
+ // needed indexes set to null.
+ decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
+ }
if (!hasSufficientLocations()) {
throw new InsufficientLocationsException("There are insufficient " +
"datanodes to read the EC block");
@@ -578,12 +581,14 @@ public class ECBlockReconstructedStripeInputStream
extends ECBlockInputStream {
// Inside this class, we only allocate buffers to read parity into. Data
// is reconstructed or read into a set of buffers passed in from the
calling
// class. Therefore we only need to ensure we free the parity buffers here.
- for (int i = getRepConfig().getData();
- i < getRepConfig().getRequiredNodes(); i++) {
- ByteBuffer buf = decoderInputBuffers[i];
- if (buf != null) {
- byteBufferPool.putBuffer(buf);
- decoderInputBuffers[i] = null;
+ if (decoderInputBuffers != null) {
+ for (int i = getRepConfig().getData();
+ i < getRepConfig().getRequiredNodes(); i++) {
+ ByteBuffer buf = decoderInputBuffers[i];
+ if (buf != null) {
+ byteBufferPool.putBuffer(buf);
+ decoderInputBuffers[i] = null;
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]