This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-28028
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28028 by this push:
new 5e5c210bf85 HBASE-28028 Read all compressed bytes to a byte array
before submitting them to decompressor
5e5c210bf85 is described below
commit 5e5c210bf8553b9a9e09adc7aedf6a10fb5953cc
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Aug 17 22:54:19 2023 +0800
HBASE-28028 Read all compressed bytes to a byte array before submitting
them to decompressor
---
.../hbase/regionserver/wal/CompressionContext.java | 8 +--
...LDecompressionBoundedDelegatingInputStream.java | 78 ++++++++++------------
2 files changed, 40 insertions(+), 46 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index 73cf4821db0..8f6d1792954 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -28,7 +28,6 @@ import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.Dictionary;
@@ -77,7 +76,7 @@ public class CompressionContext {
private final Compression.Algorithm algorithm;
private Compressor compressor;
private Decompressor decompressor;
- private BoundedDelegatingInputStream lowerIn;
+ private WALDecompressionBoundedDelegatingInputStream lowerIn;
private ByteArrayOutputStream lowerOut;
private InputStream compressedIn;
private OutputStream compressedOut;
@@ -115,13 +114,11 @@ public class CompressionContext {
// Create the input streams here the first time around.
if (compressedIn == null) {
- lowerIn = new BoundedDelegatingInputStream(in, inLength);
+ lowerIn = new WALDecompressionBoundedDelegatingInputStream(in);
if (decompressor == null) {
decompressor = algorithm.getDecompressor();
}
compressedIn = algorithm.createDecompressionStream(lowerIn,
decompressor, IO_BUFFER_SIZE);
- } else {
- lowerIn.setDelegate(in, inLength);
}
if (outLength == 0) {
// The BufferedInputStream will return earlier and skip reading
anything if outLength == 0,
@@ -131,6 +128,7 @@ public class CompressionContext {
// such as data loss when splitting wal or replicating wal.
IOUtils.skipFully(in, inLength);
} else {
+ lowerIn.resetLimit(inLength);
IOUtils.readFully(compressedIn, outArray, outOffset, outLength);
}
}
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
similarity index 57%
rename from
hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
rename to
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
index 2a6db09050c..688a5053c6e 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,75 +15,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.io;
+package org.apache.hadoop.hbase.regionserver.wal;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This is a stream that will only supply bytes from its delegate up to a
certain limit. When there
- * is an attempt to set the position beyond that it will signal that the input
is finished.
+ * This class is only used by WAL ValueCompressor for decompression.
+ * <p>
+ * <strong>WARNING: </strong>The implementation is very tricky and does not
follow typical
+ * InputStream pattern, so do not use it in any other places.
*/
@InterfaceAudience.Private
-public class BoundedDelegatingInputStream extends DelegatingInputStream {
+class WALDecompressionBoundedDelegatingInputStream extends InputStream {
- protected long limit;
- protected long pos;
+ private static final Logger LOG =
+
LoggerFactory.getLogger(WALDecompressionBoundedDelegatingInputStream.class);
- public BoundedDelegatingInputStream(InputStream in, long limit) {
- super(in);
- this.limit = limit;
- this.pos = 0;
- }
+ private final InputStream in;
+
+ private long pos;
+
+ private long limit;
- public void setDelegate(InputStream in, long limit) {
+ public WALDecompressionBoundedDelegatingInputStream(InputStream in) {
this.in = in;
+ }
+
+ public void resetLimit(long limit) {
this.limit = limit;
this.pos = 0;
}
- /**
- * Call the delegate's {@code read()} method if the current position is less
than the limit.
- * @return the byte read or -1 if the end of stream or the limit has been
reached.
- */
@Override
public int read() throws IOException {
if (pos >= limit) {
return -1;
}
int result = in.read();
+ if (result < 0) {
+ return -1;
+ }
pos++;
return result;
}
- /**
- * Call the delegate's {@code read(byte[], int, int)} method if the current
position is less than
- * the limit.
- * @param b read buffer
- * @param off Start offset
- * @param len The number of bytes to read
- * @return the number of bytes read or -1 if the end of stream or the limit
has been reached.
- */
@Override
- public int read(final byte[] b, final int off, final int len) throws
IOException {
+ public int read(byte[] b, int off, int len) throws IOException {
if (pos >= limit) {
return -1;
}
- long readLen = Math.min(len, limit - pos);
- int read = in.read(b, off, (int) readLen);
- if (read < 0) {
+ int readLen = (int) Math.min(len, limit - pos);
+ try {
+ IOUtils.readFully(in, b, off, readLen);
+ } catch (EOFException e) {
+ // This is trick here, we will always try to read enough bytes to fill
the buffer passed in,
+ // or we reach the end of this compression block, if there are not
enough bytes, we just
+ // return -1 to let the upper layer fail with EOF
+ // In WAL value decompression this is OK as if we can not read all the
data, we will finally
+ // get an EOF somewhere
+ LOG.debug("Got EOF while we want to read {} bytes from stream", readLen,
e);
return -1;
}
- pos += read;
- return read;
+ return readLen;
}
- /**
- * Call the delegate's {@code skip(long)} method.
- * @param len the number of bytes to skip
- * @return the actual number of bytes skipped
- */
@Override
public long skip(final long len) throws IOException {
long skipped = in.skip(Math.min(len, limit - pos));
@@ -91,10 +92,6 @@ public class BoundedDelegatingInputStream extends
DelegatingInputStream {
return skipped;
}
- /**
- * @return the remaining bytes within the bound if the current position is
less than the limit, or
- * 0 otherwise.
- */
@Override
public int available() throws IOException {
if (pos >= limit) {
@@ -108,5 +105,4 @@ public class BoundedDelegatingInputStream extends
DelegatingInputStream {
// successful decompression depends on this behavior.
return (int) (limit - pos);
}
-
}