apurtell commented on a change in pull request #3244:
URL: https://github.com/apache/hbase/pull/3244#discussion_r634841914



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
##########
@@ -18,37 +18,117 @@
 
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.EnumMap;
 import java.util.Map;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.io.DelegatingInputStream;
 import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Context that holds the various dictionaries for compression in WAL.
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, 
HBaseInterfaceAudience.PHOENIX})
 public class CompressionContext {
 
-  static final String ENABLE_WAL_TAGS_COMPRESSION =
-      "hbase.regionserver.wal.tags.enablecompression";
+  public static final String ENABLE_WAL_TAGS_COMPRESSION =
+    "hbase.regionserver.wal.tags.enablecompression";
+
+  public static final String ENABLE_WAL_VALUE_COMPRESSION =
+    "hbase.regionserver.wal.value.enablecompression";
+
+  public static final String WAL_VALUE_COMPRESSION_TYPE =
+    "hbase.regionserver.wal.value.compression.type";
 
   public enum DictionaryIndex {
     REGION, TABLE, FAMILY, QUALIFIER, ROW
   }
 
+  /**
+   * Encapsulates the compression algorithm and its streams that we will use 
for value
+   * compression in this WAL.
+   */
+  static class ValueCompressor {
+  
+    static final int IO_BUFFER_SIZE = 4096;
+
+    private final Compression.Algorithm algorithm;
+    private DelegatingInputStream lowerIn;
+    private ByteArrayOutputStream lowerOut;
+    private InputStream compressedIn;
+    private OutputStream compressedOut;
+
+    public ValueCompressor(Compression.Algorithm algorithm) throws IOException 
{
+      this.algorithm = algorithm;
+    }
+
+    public Compression.Algorithm getAlgorithm() {
+      return algorithm;
+    }
+
+    public byte[] compress(byte[] valueArray, int valueOffset, int valueLength)
+        throws IOException {
+      // We have to create the output streams here the first time around.
+      if (compressedOut == null) {
+        lowerOut = new ByteArrayOutputStream();
+        compressedOut = algorithm.createCompressionStream(lowerOut, 
algorithm.getCompressor(),
+          IO_BUFFER_SIZE);
+      } else {
+        lowerOut.reset();
+      }
+      compressedOut.write(valueArray, valueOffset, valueLength);
+      compressedOut.flush();
+      return lowerOut.toByteArray();
+    }
+
+    public int decompress(InputStream in, int inLength, byte[] outArray, int 
outOffset,
+        int outLength) throws IOException {
+      // Read all of the compressed bytes into a buffer.
+      byte[] inBuffer = new byte[inLength];

Review comment:
       This is done so we handle input as a sequence of byte[] arrays (call 
them _segments_), with ByteArrayInputStream providing the appropriate signals 
to the upper decompression stream as to when input in the current segment is 
fully consumed. 
   
   Originally I looked at using `BoundedInputStream` but you can't reuse the 
same instance to feed the compression input stream more input in another 
segment, and we can't just create a new compression input stream because that 
would reset compression context and fail to decompress the rest of the file. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to