apurtell commented on a change in pull request #3244:
URL: https://github.com/apache/hbase/pull/3244#discussion_r634845166



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
##########
@@ -18,37 +18,117 @@
 
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.EnumMap;
 import java.util.Map;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.io.DelegatingInputStream;
 import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Context that holds the various dictionaries for compression in WAL.
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, 
HBaseInterfaceAudience.PHOENIX})
 public class CompressionContext {
 
-  static final String ENABLE_WAL_TAGS_COMPRESSION =
-      "hbase.regionserver.wal.tags.enablecompression";
+  public static final String ENABLE_WAL_TAGS_COMPRESSION =
+    "hbase.regionserver.wal.tags.enablecompression";
+
+  public static final String ENABLE_WAL_VALUE_COMPRESSION =
+    "hbase.regionserver.wal.value.enablecompression";
+
+  public static final String WAL_VALUE_COMPRESSION_TYPE =
+    "hbase.regionserver.wal.value.compression.type";
 
   public enum DictionaryIndex {
     REGION, TABLE, FAMILY, QUALIFIER, ROW
   }
 
+  /**
+   * Encapsulates the compression algorithm and its streams that we will use 
for value
+   * compression in this WAL.
+   */
+  static class ValueCompressor {
+  
+    static final int IO_BUFFER_SIZE = 4096;
+
+    private final Compression.Algorithm algorithm;
+    private DelegatingInputStream lowerIn;
+    private ByteArrayOutputStream lowerOut;
+    private InputStream compressedIn;
+    private OutputStream compressedOut;
+
+    public ValueCompressor(Compression.Algorithm algorithm) throws IOException 
{
+      this.algorithm = algorithm;
+    }
+
+    public Compression.Algorithm getAlgorithm() {
+      return algorithm;
+    }
+
+    public byte[] compress(byte[] valueArray, int valueOffset, int valueLength)
+        throws IOException {
+      // We have to create the output streams here the first time around.
+      if (compressedOut == null) {
+        lowerOut = new ByteArrayOutputStream();
+        compressedOut = algorithm.createCompressionStream(lowerOut, 
algorithm.getCompressor(),
+          IO_BUFFER_SIZE);
+      } else {
+        lowerOut.reset();
+      }
+      compressedOut.write(valueArray, valueOffset, valueLength);
+      compressedOut.flush();
+      return lowerOut.toByteArray();
+    }
+
+    public int decompress(InputStream in, int inLength, byte[] outArray, int 
outOffset,
+        int outLength) throws IOException {
+      // Read all of the compressed bytes into a buffer.
+      byte[] inBuffer = new byte[inLength];

Review comment:
       You can think of all values in a WAL file where value compression is 
enabled as chunks of the output of a single compression stream. The input of 
this stream is the concatenation of all of the cell value data that is being 
written into the WAL. The output of this stream is compressed cell value data, 
stored where uncompressed value data would otherwise be stored. We flush the 
compression stream at every WALedit so there will never be a short read at read 
time, when we are processing the thread of compressed bits with the 
decompressor. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to