This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 2d591ab HBASE-25869 WAL value compression (#3244)
2d591ab is described below
commit 2d591ab3c485115a47e9a2838df497b5419dae42
Author: Andrew Purtell <[email protected]>
AuthorDate: Fri May 21 11:05:52 2021 -0700
HBASE-25869 WAL value compression (#3244)
WAL storage can be expensive, especially if the cell values
represented in the edits are large, consisting of blobs or
significant lengths of text. Such WALs might need to be kept around
for a fairly long time to satisfy replication constraints on a space
limited (or space-contended) filesystem.
We have a custom dictionary compression scheme for cell metadata that
is engaged when WAL compression is enabled in site configuration.
This is fine for that application, where we can expect the universe
of values and their lengths in the custom dictionaries to be
constrained. For arbitrary cell values it is better to use one of the
available compression codecs, which are suitable for arbitrary albeit
compressible data.
Signed-off-by: Bharath Vissapragada <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Nick Dimiduk <[email protected]>
---
.../java/org/apache/hadoop/hbase/KeyValue.java | 2 -
.../hbase/io/BoundedDelegatingInputStream.java | 111 ++++++++++++++
.../hadoop/hbase/io/DelegatingInputStream.java | 54 +++++++
hbase-protocol-shaded/src/main/protobuf/WAL.proto | 2 +
hbase-protocol/src/main/protobuf/WAL.proto | 2 +
.../wal/AbstractProtobufLogWriter.java | 38 ++++-
.../hbase/regionserver/wal/CompressionContext.java | 169 ++++++++++++++++++++-
.../hbase/regionserver/wal/ProtobufLogReader.java | 27 +++-
.../hadoop/hbase/regionserver/wal/ReaderBase.java | 20 ++-
.../hbase/regionserver/wal/WALCellCodec.java | 61 ++++++--
.../wal/TestAsyncWALReplayValueCompression.java | 43 ++++++
.../wal/TestWALCellCodecWithCompression.java | 115 ++++++++++++--
.../wal/TestWALReplayValueCompression.java | 46 ++++++
.../apache/hadoop/hbase/wal/TestCompressedWAL.java | 159 +++++++++++++++++++
.../hbase/wal/TestWALSplitValueCompression.java | 44 ++++++
15 files changed, 849 insertions(+), 44 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 79356ed..4aa75bb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -76,8 +76,6 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
public class KeyValue implements ExtendedCell, Cloneable {
- private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<>();
-
private static final Logger LOG = LoggerFactory.getLogger(KeyValue.class);
public static final int FIXED_OVERHEAD = ClassSize.OBJECT + // the KeyValue
object itself
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
new file mode 100644
index 0000000..e9a3b67
--- /dev/null
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This is a stream that will only supply bytes from its delegate up to a
certain limit.
+ * When there is an attempt to set the position beyond that it will signal
that the input
+ * is finished.
+ */
[email protected]
+public class BoundedDelegatingInputStream extends DelegatingInputStream {
+
+ protected long limit;
+ protected long pos;
+
+ public BoundedDelegatingInputStream(InputStream in, long limit) {
+ super(in);
+ this.limit = limit;
+ this.pos = 0;
+ }
+
+ public void setDelegate(InputStream in, long limit) {
+ this.in = in;
+ this.limit = limit;
+ this.pos = 0;
+ }
+
+ /**
+ * Call the delegate's {@code read()} method if the current position is less
than the limit.
+ * @return the byte read or -1 if the end of stream or the limit has been
reached.
+ */
+ @Override
+ public int read() throws IOException {
+ if (pos >= limit) {
+ return -1;
+ }
+ int result = in.read();
+ pos++;
+ return result;
+ }
+
+ /**
+ * Call the delegate's {@code read(byte[], int, int)} method if the current
position is less
+ * than the limit.
+ * @param b read buffer
+ * @param off Start offset
+ * @param len The number of bytes to read
+ * @return the number of bytes read or -1 if the end of stream or the limit
has been reached.
+ */
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws
IOException {
+ if (pos >= limit) {
+ return -1;
+ }
+ long readLen = Math.min(len, limit - pos);
+ int read = in.read(b, off, (int)readLen);
+ if (read < 0) {
+ return -1;
+ }
+ pos += read;
+ return read;
+ }
+
+ /**
+ * Call the delegate's {@code skip(long)} method.
+ * @param len the number of bytes to skip
+ * @return the actual number of bytes skipped
+ */
+ @Override
+ public long skip(final long len) throws IOException {
+ long skipped = in.skip(Math.min(len, limit - pos));
+ pos += skipped;
+ return skipped;
+ }
+
+ /**
+ * Call the delegate's {@code available()} method.
+ * @return the delegate's available bytes if the current position is less
than the limit,
+ * or 0 otherwise
+ */
+ @Override
+ public int available() throws IOException {
+ if (pos >= limit) {
+ return 0;
+ }
+ int available = in.available();
+ return (int) Math.min(available, limit - pos);
+ }
+
+}
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java
new file mode 100644
index 0000000..6bd82ae
--- /dev/null
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.FilterInputStream;
+import java.io.InputStream;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * An input stream that delegates all operations to another input stream.
+ * The delegate can be switched out for another at any time but to minimize the
+ * possibility of violating the InputStream contract it would be best to
replace
+ * the delegate only once it has been fully consumed. <p> For example, a
+ * ByteArrayInputStream, which is implicitly bounded by the size of the
underlying
+ * byte array can be converted into an unbounded stream fed by multiple
instances
+ * of ByteArrayInputStream, switched out one for the other in sequence.
+ * <p>
+ * Although multithreaded access is allowed, users of this class will want to
take
+ * care to order operations on this stream and the swap out of one delegate for
+ * another in a way that provides a valid view of stream contents.
+ */
[email protected]
+public class DelegatingInputStream extends FilterInputStream {
+
+ public DelegatingInputStream(InputStream in) {
+ super(in);
+ }
+
+ public InputStream getDelegate() {
+ return this.in;
+ }
+
+ public void setDelegate(InputStream in) {
+ this.in = in;
+ }
+
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index fd622cf..48a108b 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -32,6 +32,8 @@ message WALHeader {
optional bool has_tag_compression = 3;
optional string writer_cls_name = 4;
optional string cell_codec_cls_name = 5;
+ optional bool has_value_compression = 6;
+ optional uint32 value_compression_algorithm = 7;
}
/*
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto
b/hbase-protocol/src/main/protobuf/WAL.proto
index 3e0b15c..581e973 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -33,6 +33,8 @@ message WALHeader {
optional bool has_tag_compression = 3;
optional string writer_cls_name = 4;
optional string cell_codec_cls_name = 5;
+ optional bool has_value_compression = 6;
+ optional uint32 value_compression_algorithm = 7;
}
/*
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index a94e5c8..3b84488 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
@@ -144,9 +145,22 @@ public abstract class AbstractProtobufLogWriter {
boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION,
false);
if (doCompress) {
try {
+ final boolean useTagCompression =
+ conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION,
true);
+ final boolean useValueCompression =
+ conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION,
false);
+ final Compression.Algorithm valueCompressionType =
+ useValueCompression ?
CompressionContext.getValueCompressionAlgorithm(conf) :
+ Compression.Algorithm.NONE;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Initializing compression context for {}:
isRecoveredEdits={}" +
+ ", hasTagCompression={}, hasValueCompression={},
valueCompressionType={}", path,
+ CommonFSUtils.isRecoveredEdits(path), useTagCompression,
useValueCompression,
+ valueCompressionType);
+ }
this.compressionContext =
new CompressionContext(LRUDictionary.class,
CommonFSUtils.isRecoveredEdits(path),
- conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION,
true));
+ useTagCompression, useValueCompression, valueCompressionType);
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
@@ -165,17 +179,29 @@ public abstract class AbstractProtobufLogWriter {
initOutput(fs, path, overwritable, bufferSize, replication, blocksize);
- boolean doTagCompress = doCompress
- && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION,
true);
- length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf,
-
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));
+ boolean doTagCompress = doCompress &&
+ conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
+ boolean doValueCompress = doCompress &&
+ conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
+ WALHeader.Builder headerBuilder = WALHeader.newBuilder()
+ .setHasCompression(doCompress)
+ .setHasTagCompression(doTagCompress)
+ .setHasValueCompression(doValueCompress);
+ if (doValueCompress) {
+ headerBuilder.setValueCompressionAlgorithm(
+ CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
+ }
+ length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
+ buildWALHeader(conf, headerBuilder)));
initAfterHeader(doCompress);
// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();
+
if (LOG.isTraceEnabled()) {
- LOG.trace("Initialized protobuf WAL=" + path + ", compression=" +
doCompress);
+ LOG.trace("Initialized protobuf WAL={}, compression={},
tagCompression={}" +
+ ", valueCompression={}", path, doCompress, doTagCompress,
doValueCompress);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index 16866e1..82bad93 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -18,37 +18,155 @@
package org.apache.hadoop.hbase.regionserver.wal;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.EnumMap;
import java.util.Map;
-
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Context that holds the various dictionaries for compression in WAL.
+ * <p>
+ * CompressionContexts are not expected to be shared among threads.
Multithreaded use may
+ * produce unexpected results.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC,
HBaseInterfaceAudience.PHOENIX})
public class CompressionContext {
- static final String ENABLE_WAL_TAGS_COMPRESSION =
- "hbase.regionserver.wal.tags.enablecompression";
+ private static final Logger LOG =
LoggerFactory.getLogger(CompressionContext.class);
+
+ public static final String ENABLE_WAL_TAGS_COMPRESSION =
+ "hbase.regionserver.wal.tags.enablecompression";
+
+ public static final String ENABLE_WAL_VALUE_COMPRESSION =
+ "hbase.regionserver.wal.value.enablecompression";
+
+ public static final String WAL_VALUE_COMPRESSION_TYPE =
+ "hbase.regionserver.wal.value.compression.type";
public enum DictionaryIndex {
REGION, TABLE, FAMILY, QUALIFIER, ROW
}
+ /**
+ * Encapsulates the compression algorithm and its streams that we will use
for value
+ * compression in this WAL.
+ */
+ static class ValueCompressor {
+
+ static final int IO_BUFFER_SIZE = 4096;
+
+ private final Compression.Algorithm algorithm;
+ private BoundedDelegatingInputStream lowerIn;
+ private ByteArrayOutputStream lowerOut;
+ private InputStream compressedIn;
+ private OutputStream compressedOut;
+
+ public ValueCompressor(Compression.Algorithm algorithm) {
+ this.algorithm = algorithm;
+ }
+
+ public Compression.Algorithm getAlgorithm() {
+ return algorithm;
+ }
+
+ public byte[] compress(byte[] valueArray, int valueOffset, int valueLength)
+ throws IOException {
+ if (compressedOut == null) {
+ // Create the output streams here the first time around.
+ lowerOut = new ByteArrayOutputStream();
+ compressedOut = algorithm.createCompressionStream(lowerOut,
algorithm.getCompressor(),
+ IO_BUFFER_SIZE);
+ } else {
+ lowerOut.reset();
+ }
+ compressedOut.write(valueArray, valueOffset, valueLength);
+ compressedOut.flush();
+ return lowerOut.toByteArray();
+ }
+
+ public int decompress(InputStream in, int inLength, byte[] outArray, int
outOffset,
+ int outLength) throws IOException {
+
+ // Our input is a sequence of bounded byte ranges (call them segments),
with
+ // BoundedDelegatingInputStream providing a way to switch in a new
segment when the
+ // previous segment has been fully consumed.
+
+ // Create the input streams here the first time around.
+ if (compressedIn == null) {
+ lowerIn = new BoundedDelegatingInputStream(in, inLength);
+ compressedIn = algorithm.createDecompressionStream(lowerIn,
algorithm.getDecompressor(),
+ IO_BUFFER_SIZE);
+ } else {
+ lowerIn.setDelegate(in, inLength);
+ }
+
+ // Caller must handle short reads.
+ // With current Hadoop compression codecs all 'outLength' bytes are read
in here, so not
+ // an issue for now.
+ return compressedIn.read(outArray, outOffset, outLength);
+ }
+
+ public void clear() {
+ if (compressedOut != null) {
+ try {
+ compressedOut.close();
+ } catch (IOException e) {
+ LOG.warn("Exception closing compressed output stream", e);
+ }
+ }
+ compressedOut = null;
+ if (lowerOut != null) {
+ try {
+ lowerOut.close();
+ } catch (IOException e) {
+ LOG.warn("Exception closing lower output stream", e);
+ }
+ }
+ lowerOut = null;
+ if (compressedIn != null) {
+ try {
+ compressedIn.close();
+ } catch (IOException e) {
+ LOG.warn("Exception closing compressed input stream", e);
+ }
+ }
+ compressedIn = null;
+ if (lowerIn != null) {
+ try {
+ lowerIn.close();
+ } catch (IOException e) {
+ LOG.warn("Exception closing lower input stream", e);
+ }
+ }
+ lowerIn = null;
+ }
+
+ }
+
private final Map<DictionaryIndex, Dictionary> dictionaries =
new EnumMap<>(DictionaryIndex.class);
// Context used for compressing tags
TagCompressionContext tagCompressionContext = null;
+ ValueCompressor valueCompressor = null;
- public CompressionContext(Class<? extends Dictionary> dictType, boolean
recoveredEdits,
- boolean hasTagCompression) throws SecurityException,
NoSuchMethodException,
- InstantiationException, IllegalAccessException,
InvocationTargetException {
+ public CompressionContext(Class<? extends Dictionary> dictType,
+ boolean recoveredEdits, boolean hasTagCompression, boolean
hasValueCompression,
+ Compression.Algorithm valueCompressionType)
+ throws SecurityException, NoSuchMethodException, InstantiationException,
+ IllegalAccessException, InvocationTargetException, IOException {
Constructor<? extends Dictionary> dictConstructor =
dictType.getConstructor();
for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) {
@@ -70,12 +188,34 @@ public class CompressionContext {
if (hasTagCompression) {
tagCompressionContext = new TagCompressionContext(dictType,
Short.MAX_VALUE);
}
+ if (hasValueCompression && valueCompressionType != null) {
+ valueCompressor = new ValueCompressor(valueCompressionType);
+ }
+ }
+
+ public CompressionContext(Class<? extends Dictionary> dictType, boolean
recoveredEdits,
+ boolean hasTagCompression)
+ throws SecurityException, NoSuchMethodException, InstantiationException,
+ IllegalAccessException, InvocationTargetException, IOException {
+ this(dictType, recoveredEdits, hasTagCompression, false, null);
+ }
+
+ public boolean hasTagCompression() {
+ return tagCompressionContext != null;
+ }
+
+ public boolean hasValueCompression() {
+ return valueCompressor != null;
}
public Dictionary getDictionary(Enum dictIndex) {
return dictionaries.get(dictIndex);
}
+ public ValueCompressor getValueCompressor() {
+ return valueCompressor;
+ }
+
void clear() {
for(Dictionary dictionary : dictionaries.values()){
dictionary.clear();
@@ -83,5 +223,20 @@ public class CompressionContext {
if (tagCompressionContext != null) {
tagCompressionContext.clear();
}
+ if (valueCompressor != null) {
+ valueCompressor.clear();
+ }
}
+
+ public static Compression.Algorithm
getValueCompressionAlgorithm(Configuration conf) {
+ if (conf.getBoolean(ENABLE_WAL_VALUE_COMPRESSION, true)) {
+ String compressionType = conf.get(WAL_VALUE_COMPRESSION_TYPE);
+ if (compressionType != null) {
+ return Compression.getCompressionAlgorithmByName(compressionType);
+ }
+ return Compression.Algorithm.GZ;
+ }
+ return Compression.Algorithm.NONE;
+ }
+
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 9cd48c0..c86dd4d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@@ -81,6 +82,8 @@ public class ProtobufLogReader extends ReaderBase {
protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
protected boolean hasCompression = false;
protected boolean hasTagCompression = false;
+ protected boolean hasValueCompression = false;
+ protected Compression.Algorithm valueCompressionType = null;
// walEditsStopOffset is the position of the last byte to read. After
reading the last WALEdit
// entry in the wal, the inputstream's position is equal to
walEditsStopOffset.
private long walEditsStopOffset;
@@ -227,6 +230,16 @@ public class ProtobufLogReader extends ReaderBase {
WALProtos.WALHeader header = builder.build();
this.hasCompression = header.hasHasCompression() &&
header.getHasCompression();
this.hasTagCompression = header.hasHasTagCompression() &&
header.getHasTagCompression();
+ this.hasValueCompression = header.hasHasValueCompression() &&
+ header.getHasValueCompression();
+ if (header.hasValueCompressionAlgorithm()) {
+ try {
+ this.valueCompressionType =
+
Compression.Algorithm.values()[header.getValueCompressionAlgorithm()];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new IOException("Invalid compression type", e);
+ }
+ }
}
this.inputStream = stream;
this.walEditsStopOffset = this.fileLength;
@@ -235,7 +248,9 @@ public class ProtobufLogReader extends ReaderBase {
this.seekOnFs(currentPosition);
if (LOG.isTraceEnabled()) {
LOG.trace("After reading the trailer: walEditsStopOffset: " +
this.walEditsStopOffset
- + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " +
(trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ",
currentPosition: " + currentPosition);
+ + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " +
+ (trailerPresent ? "true, size: " + trailer.getSerializedSize() :
"false") +
+ ", currentPosition: " + currentPosition);
}
codecClsName = hdrCtxt.getCellCodecClsName();
@@ -328,6 +343,16 @@ public class ProtobufLogReader extends ReaderBase {
}
@Override
+ protected boolean hasValueCompression() {
+ return this.hasValueCompression;
+ }
+
+ @Override
+ protected Compression.Algorithm getValueCompressionAlgorithm() {
+ return this.valueCompressionType;
+ }
+
+ @Override
protected boolean readNext(Entry entry) throws IOException {
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
index 9b6d69a..90a1653 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -68,8 +69,15 @@ public abstract class ReaderBase implements
AbstractFSWALProvider.Reader {
// If compression is enabled, new dictionaries are created here.
try {
if (compressionContext == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initializing compression context for {}:
isRecoveredEdits={}" +
+ ", hasTagCompression={}, hasValueCompression={},
valueCompressionType={}", path,
+ CommonFSUtils.isRecoveredEdits(path), hasTagCompression(),
hasValueCompression(),
+ getValueCompressionAlgorithm());
+ }
compressionContext = new CompressionContext(LRUDictionary.class,
- CommonFSUtils.isRecoveredEdits(path), hasTagCompression());
+ CommonFSUtils.isRecoveredEdits(path), hasTagCompression(),
+ hasValueCompression(), getValueCompressionAlgorithm());
} else {
compressionContext.clear();
}
@@ -152,6 +160,16 @@ public abstract class ReaderBase implements
AbstractFSWALProvider.Reader {
protected abstract boolean hasTagCompression();
/**
+ * @return Whether value compression is enabled for this log.
+ */
+ protected abstract boolean hasValueCompression();
+
+ /**
+ * @return Value compression algorithm for this log.
+ */
+ protected abstract Compression.Algorithm getValueCompressionAlgorithm();
+
+ /**
* Read next entry.
* @param e The entry to read into.
* @return Whether there was anything to read.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 5aa943f..31eccc7 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -222,9 +222,13 @@ public class WALCellCodec implements Codec {
static class CompressedKvEncoder extends BaseEncoder {
private final CompressionContext compression;
+ private final boolean hasValueCompression;
+ private final boolean hasTagCompression;
public CompressedKvEncoder(OutputStream out, CompressionContext
compression) {
super(out);
this.compression = compression;
+ this.hasValueCompression = compression.hasValueCompression();
+ this.hasTagCompression = compression.hasTagCompression();
}
@Override
@@ -241,12 +245,16 @@ public class WALCellCodec implements Codec {
compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
PrivateCellUtil.compressQualifier(out, cell,
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
- // Write timestamp, type and value as uncompressed.
+ // Write timestamp, type and value.
StreamUtils.writeLong(out, cell.getTimestamp());
out.write(cell.getTypeByte());
- PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
+ if (hasValueCompression) {
+ writeCompressedValue(out, cell);
+ } else {
+ PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
+ }
if (tagsLength > 0) {
- if (compression.tagCompressionContext != null) {
+ if (hasTagCompression) {
// Write tags using Dictionary compression
PrivateCellUtil.compressTags(out, cell,
compression.tagCompressionContext);
} else {
@@ -256,20 +264,31 @@ public class WALCellCodec implements Codec {
}
}
}
+
+ private void writeCompressedValue(OutputStream out, Cell cell) throws
IOException {
+ byte[] compressed =
compression.getValueCompressor().compress(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength());
+ StreamUtils.writeRawVInt32(out, compressed.length);
+ out.write(compressed);
+ }
+
}
static class CompressedKvDecoder extends BaseDecoder {
private final CompressionContext compression;
+ private final boolean hasValueCompression;
+ private final boolean hasTagCompression;
public CompressedKvDecoder(InputStream in, CompressionContext compression)
{
super(in);
this.compression = compression;
+ this.hasValueCompression = compression.hasValueCompression();
+ this.hasTagCompression = compression.hasTagCompression();
}
@Override
protected Cell parseCell() throws IOException {
int keylength = StreamUtils.readRawVarint32(in);
int vlength = StreamUtils.readRawVarint32(in);
-
int tagsLength = StreamUtils.readRawVarint32(in);
int length = 0;
if(tagsLength == 0) {
@@ -302,18 +321,27 @@ public class WALCellCodec implements Codec {
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
pos += elemLen;
- // timestamp, type and value
- int tsTypeValLen = length - pos;
+ // timestamp
+ long ts = StreamUtils.readLong(in);
+ pos = Bytes.putLong(backingArray, pos, ts);
+ // type and value
+ int typeValLen = length - pos;
if (tagsLength > 0) {
- tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+ typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+ }
+ pos = Bytes.putByte(backingArray, pos, (byte)in.read());
+ int valLen = typeValLen - 1;
+ if (hasValueCompression) {
+ readCompressedValue(in, backingArray, pos, valLen);
+ pos += valLen;
+ } else {
+ IOUtils.readFully(in, backingArray, pos, valLen);
+ pos += valLen;
}
- IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
- pos += tsTypeValLen;
-
// tags
if (tagsLength > 0) {
pos = Bytes.putAsShort(backingArray, pos, tagsLength);
- if (compression.tagCompressionContext != null) {
+ if (hasTagCompression) {
compression.tagCompressionContext.uncompressTags(in, backingArray,
pos, tagsLength);
} else {
IOUtils.readFully(in, backingArray, pos, tagsLength);
@@ -349,6 +377,17 @@ public class WALCellCodec implements Codec {
throw new IOException("Invalid length for compresesed portion of
keyvalue: " + len);
}
}
+
+ private void readCompressedValue(InputStream in, byte[] outArray, int
outOffset,
+ int expectedLength) throws IOException {
+ int compressedLen = StreamUtils.readRawVarint32(in);
+ int read = compression.getValueCompressor().decompress(in,
compressedLen, outArray,
+ outOffset, expectedLength);
+ if (read != expectedLength) {
+ throw new IOException("ValueCompressor state error: short read");
+ }
+ }
+
}
public static class EnsureKvEncoder extends BaseEncoder {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayValueCompression.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayValueCompression.java
new file mode 100644
index 0000000..cbe1faa
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayValueCompression.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestAsyncWALReplayValueCompression extends TestAsyncWALReplay {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncWALReplayValueCompression.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
+ conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+ TestAsyncWALReplay.setUpBeforeClass();
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
index dd9ee69..b5504bf 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -25,6 +26,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.codec.Codec.Decoder;
import org.apache.hadoop.hbase.codec.Codec.Encoder;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -62,24 +65,108 @@ public class TestWALCellCodecWithCompression {
@Test
public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws
Exception {
- doTest(true, true);
+ doTest(true, false);
+ }
+
+ @Test
+ public void testValueCompressionEnabled() throws Exception {
+ doTest(false, true);
+ }
+
+ @Test
+ public void testValueCompression() throws Exception {
+ final byte[] row_1 = Bytes.toBytes("row_1");
+ final byte[] value_1 = new byte[20];
+ Bytes.zero(value_1);
+ final byte[] row_2 = Bytes.toBytes("row_2");
+ final byte[] value_2 = new byte[Bytes.SIZEOF_LONG];
+ Bytes.random(value_2);
+ final byte[] row_3 = Bytes.toBytes("row_3");
+ final byte[] value_3 = new byte[100];
+ Bytes.random(value_3);
+ final byte[] row_4 = Bytes.toBytes("row_4");
+ final byte[] value_4 = new byte[128];
+ fillBytes(value_4, Bytes.toBytes("DEADBEEF"));
+ final byte[] row_5 = Bytes.toBytes("row_5");
+ final byte[] value_5 = new byte[64];
+ fillBytes(value_5, Bytes.toBytes("CAFEBABE"));
+
+ Configuration conf = new Configuration(false);
+ WALCellCodec codec = new WALCellCodec(conf, new
CompressionContext(LRUDictionary.class,
+ false, true, true, Compression.Algorithm.GZ));
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Encoder encoder = codec.getEncoder(bos);
+ encoder.write(createKV(row_1, value_1, 0));
+ encoder.write(createKV(row_2, value_2, 0));
+ encoder.write(createKV(row_3, value_3, 0));
+ encoder.write(createKV(row_4, value_4, 0));
+ encoder.write(createKV(row_5, value_5, 0));
+ encoder.flush();
+
+ try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) {
+ Decoder decoder = codec.getDecoder(is);
+ decoder.advance();
+ KeyValue kv = (KeyValue) decoder.current();
+ assertTrue(Bytes.equals(row_1, 0, row_1.length,
+ kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
+ assertTrue(Bytes.equals(value_1, 0, value_1.length,
+ kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
+ decoder.advance();
+ kv = (KeyValue) decoder.current();
+ assertTrue(Bytes.equals(row_2, 0, row_2.length,
+ kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
+ assertTrue(Bytes.equals(value_2, 0, value_2.length,
+ kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
+ decoder.advance();
+ kv = (KeyValue) decoder.current();
+ assertTrue(Bytes.equals(row_3, 0, row_3.length,
+ kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
+ assertTrue(Bytes.equals(value_3, 0, value_3.length,
+ kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
+ decoder.advance();
+ kv = (KeyValue) decoder.current();
+ assertTrue(Bytes.equals(row_4, 0, row_4.length,
+ kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
+ assertTrue(Bytes.equals(value_4, 0, value_4.length,
+ kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
+ decoder.advance();
+ kv = (KeyValue) decoder.current();
+ assertTrue(Bytes.equals(row_5, 0, row_5.length,
+ kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
+ assertTrue(Bytes.equals(value_5, 0, value_5.length,
+ kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
+ }
+ }
+
+ static void fillBytes(byte[] buffer, byte[] fill) {
+ int offset = 0;
+ int remaining = buffer.length;
+ while (remaining > 0) {
+ int len = remaining < fill.length ? remaining : fill.length;
+ System.arraycopy(fill, 0, buffer, offset, len);
+ offset += len;
+ remaining -= len;
+ }
}
- private void doTest(boolean compressTags, boolean offheapKV) throws
Exception {
+ private void doTest(boolean compressTags, boolean offheapKV)
+ throws Exception {
+ final byte[] key = Bytes.toBytes("myRow");
+ final byte[] value = Bytes.toBytes("myValue");
Configuration conf = new Configuration(false);
conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION,
compressTags);
- WALCellCodec codec = new WALCellCodec(conf, new
CompressionContext(LRUDictionary.class, false,
- compressTags));
+ WALCellCodec codec = new WALCellCodec(conf, new
CompressionContext(LRUDictionary.class,
+ false, compressTags));
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
Encoder encoder = codec.getEncoder(bos);
if (offheapKV) {
- encoder.write(createOffheapKV(1));
- encoder.write(createOffheapKV(0));
- encoder.write(createOffheapKV(2));
+ encoder.write(createOffheapKV(key, value, 1));
+ encoder.write(createOffheapKV(key, value, 0));
+ encoder.write(createOffheapKV(key, value, 2));
} else {
- encoder.write(createKV(1));
- encoder.write(createKV(0));
- encoder.write(createKV(2));
+ encoder.write(createKV(key, value, 1));
+ encoder.write(createKV(key, value, 0));
+ encoder.write(createKV(key, value, 2));
}
InputStream is = new ByteArrayInputStream(bos.toByteArray());
@@ -101,11 +188,9 @@ public class TestWALCellCodecWithCompression {
assertEquals("tagValue2", Bytes.toString(Tag.cloneValue(tags.get(1))));
}
- private KeyValue createKV(int noOfTags) {
- byte[] row = Bytes.toBytes("myRow");
+ private KeyValue createKV(byte[] row, byte[] value, int noOfTags) {
byte[] cf = Bytes.toBytes("myCF");
byte[] q = Bytes.toBytes("myQualifier");
- byte[] value = Bytes.toBytes("myValue");
List<Tag> tags = new ArrayList<>(noOfTags);
for (int i = 1; i <= noOfTags; i++) {
tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));
@@ -113,11 +198,9 @@ public class TestWALCellCodecWithCompression {
return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
}
- private ByteBufferKeyValue createOffheapKV(int noOfTags) {
- byte[] row = Bytes.toBytes("myRow");
+ private ByteBufferKeyValue createOffheapKV(byte[] row, byte[] value, int
noOfTags) {
byte[] cf = Bytes.toBytes("myCF");
byte[] q = Bytes.toBytes("myQualifier");
- byte[] value = Bytes.toBytes("myValue");
List<Tag> tags = new ArrayList<>(noOfTags);
for (int i = 1; i <= noOfTags; i++) {
tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayValueCompression.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayValueCompression.java
new file mode 100644
index 0000000..d10cc9c
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayValueCompression.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Enables compression and runs the TestWALReplay tests.
+ */
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestWALReplayValueCompression extends TestWALReplay {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestWALReplayValueCompression.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
+ conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+ TestWALReplay.setUpBeforeClass();
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java
new file mode 100644
index 0000000..6df0d1d
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestCompressedWAL {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestCompressedWAL.class);
+
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Parameter
+ public String walProvider;
+
+ @Parameters(name = "{index}: provider={0}")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] {
"asyncfs" });
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
+ TEST_UTIL.startMiniDFSCluster(3);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() {
+ TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
+ TEST_UTIL.getConfiguration()
+ .setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ }
+
+ @Test
+ public void testCompressedWAL() throws Exception {
+ TEST_UTIL.getConfiguration()
+ .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
+ doTest();
+ }
+
+ @Test
+ public void testCompressedWALWithValueCompression() throws Exception {
+ TEST_UTIL.getConfiguration()
+ .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+ doTest();
+ }
+
+ private void doTest() throws Exception {
+ TableName tableName =
TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+ NavigableMap<byte[], Integer> scopes = new
TreeMap<>(Bytes.BYTES_COMPARATOR);
+ scopes.put(tableName.getName(), 0);
+ RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
+ final int total = 1000;
+ final byte[] row = Bytes.toBytes("row");
+ final byte[] family = Bytes.toBytes("family");
+ final byte[] value = Bytes.toBytes("Test value");
+ FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ final WALFactory wals =
+ new WALFactory(TEST_UTIL.getConfiguration(),
tableName.getNameAsString());
+
+ // Write the WAL
+ final WAL wal = wals.getWAL(regionInfo);
+
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
+ wal.appendData(regionInfo, new
WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), mvcc, scopes), kvs);
+ }
+ wal.sync();
+ final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
+ wals.shutdown();
+
+ // Confirm the WAL can be read back
+ WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(),
walPath);
+ int count = 0;
+ WAL.Entry entry = new WAL.Entry();
+ while (reader.next(entry) != null) {
+ count++;
+ List<Cell> cells = entry.getEdit().getCells();
+ assertTrue("Should be one KV per WALEdit", cells.size() == 1);
+ for (Cell cell: cells) {
+ assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(),
cell.getRowOffset(),
+ cell.getRowLength(), row, 0, row.length));
+ assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(),
cell.getFamilyOffset(),
+ cell.getFamilyLength(), family, 0, family.length));
+ assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(),
cell.getValueOffset(),
+ cell.getValueLength(), value, 0, value.length));
+ }
+ }
+ assertEquals("Should have read back as many KVs as written", total, count);
+ reader.close();
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java
new file mode 100644
index 0000000..32ed85f
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestWALSplitValueCompression extends TestWALSplit {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestWALSplitValueCompression.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration()
+ .setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ TEST_UTIL.getConfiguration()
+ .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+ TestWALSplit.setUpBeforeClass();
+ }
+}