ndimiduk commented on a change in pull request #3244:
URL: https://github.com/apache/hbase/pull/3244#discussion_r633805051



##########
File path: hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto
##########
@@ -32,6 +32,8 @@ message WALHeader {
   optional bool has_tag_compression = 3;
   optional string writer_cls_name = 4;
   optional string cell_codec_cls_name = 5;
+  optional bool has_value_compression = 6;
+  optional uint32 value_compression_codec = 7;

Review comment:
       nit: call this attribute `value_compression_type` so that it matches the 
field in `AbstractProtobufLogWriter`, or rename that field to 
`valueCompressionCodec` so that it matches this attribute. Or call it 
`value_compression_algorithm` and update the property name down in 
`ReaderBase`, so that everything aligns with the name of the Java type, which 
is a `o.a.h.h.io.compress.Compression$Algorithm`.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
##########
@@ -165,10 +178,20 @@ public void init(FileSystem fs, Path path, Configuration 
conf, boolean overwrita
 
     initOutput(fs, path, overwritable, bufferSize, replication, blocksize);
 
-    boolean doTagCompress = doCompress
-        && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, 
true);
-    length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, 
buildWALHeader(conf,
-      
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));
+    boolean doTagCompress = doCompress &&

Review comment:
       nit: Duplication of config parsing between `init` 
and`initializeCompressionContext` is bizarre ; we literally just did this work 
8 lines earlier. for future cleanup, I suppose.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -256,20 +264,31 @@ public void write(Cell cell) throws IOException {
         }
       }
     }
+
+    private void writeCompressedValue(OutputStream out, Cell cell) throws 
IOException {
+      byte[] compressed = 
compression.getValueCompressor().compress(cell.getValueArray(),
+        cell.getValueOffset(), cell.getValueLength());
+      StreamUtils.writeRawVInt32(out, compressed.length);
+      out.write(compressed);
+    }
+
   }
 
   static class CompressedKvDecoder extends BaseDecoder {
     private final CompressionContext compression;
+    private final boolean hasValueCompression;
+    private final boolean hasTagCompression;
     public CompressedKvDecoder(InputStream in, CompressionContext compression) 
{
       super(in);
       this.compression = compression;
+      this.hasValueCompression = compression.hasValueCompression();
+      this.hasTagCompression = compression.hasTagCompression();
     }
 
     @Override
     protected Cell parseCell() throws IOException {

Review comment:
       Are similar updates needed in `SecureWALCellCodec`? my IDE is telling me 
that these methods between those two classes have a fair bit of duplicate code.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
##########
@@ -34,21 +36,49 @@
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, 
HBaseInterfaceAudience.PHOENIX})
 public class CompressionContext {
 
-  static final String ENABLE_WAL_TAGS_COMPRESSION =
+  public static final String ENABLE_WAL_TAGS_COMPRESSION =

Review comment:
       So you decided not to move `TestWALSplitValueCompression` into the same 
package as this class?
   
   It looks like:
   * `ENABLE_WAL_TAGS_COMPRESSION` can be package private as is.
   * `ENABLE_WAL_VALUE_COMPRESSION` can be be package private if 
`TestWALReplayValueCompression` is moved to `o.a.h.h.regionserver.wal`, and it 
moves trivially.
   * `WAL_VALUE_COMPRESSION_TYPE` isn't used anywhere else so it can be 
`private`.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
##########
@@ -151,6 +159,16 @@ public void seek(long pos) throws IOException {
    */
   protected abstract boolean hasTagCompression();
 
+  /**
+   * @return Whether value compression is enabled for this log.
+   */
+  protected abstract boolean hasValueCompression();
+
+  /**
+   * @return Value compression algorithm for this log.
+   */
+  protected abstract Compression.Algorithm getValueCompressionType();

Review comment:
       `getValueCompressionAlgorithm` ?

##########
File path: 
hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * An input stream that delegates all operations to another input stream.
+ * The delegate can be switched out for another at any time but to minimize the
+ * possibility of violating the InputStream contract it would be best to 
replace
+ * the delegate only once it has been fully consumed. <p> For example, a
+ * ByteArrayInputStream, which is implicitly bounded by the size of the 
underlying
+ * byte array can be converted into an unbounded stream fed by multiple 
instances
+ * of ByteArrayInputStream, switched out one for the other in sequence.
+ */
[email protected]
[email protected]
+public class DelegatingInputStream extends InputStream {
+
+  InputStream lowerStream;

Review comment:
       Are you making any claims about using this class in a multi-threaded 
environment? For instance, should `lowerStream` be marked `volatile` to ensure 
that multiple consumers of an instance of this class linearize their access to 
the field? 

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
##########
@@ -18,37 +18,117 @@
 
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.EnumMap;
 import java.util.Map;
 

Review comment:
       delete this newline and you will have completely cleaned up the import 
checkstyle warnings. Also, if you were unaware, you can have IDEA do this for 
you automatically when you "Optimize Imports" by using the checkstyle plugin 
and importing our checkstyle configuration settings. I documented this a while 
back ; see [our book](http://hbase.apache.org/book.html#_intellij_idea).

##########
File path: 
hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * An input stream that delegates all operations to another input stream.
+ * The delegate can be switched out for another at any time but to minimize the
+ * possibility of violating the InputStream contract it would be best to 
replace
+ * the delegate only once it has been fully consumed. <p> For example, a
+ * ByteArrayInputStream, which is implicitly bounded by the size of the 
underlying
+ * byte array can be converted into an unbounded stream fed by multiple 
instances
+ * of ByteArrayInputStream, switched out one for the other in sequence.
+ */
[email protected]
[email protected]

Review comment:
       You can omit `Evolving` as it is redundant to `Private` ; `Private` 
makes no compatibility guarantees version-to-version, and `Evolving` is an 
explicit marker for breaking compatibility at minor releases.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
##########
@@ -70,18 +150,55 @@ public CompressionContext(Class<? extends Dictionary> 
dictType, boolean recovere
     if (hasTagCompression) {
       tagCompressionContext = new TagCompressionContext(dictType, 
Short.MAX_VALUE);
     }
+    if (hasValueCompression && valueCompressionType != null) {
+      valueCompressor = new ValueCompressor(valueCompressionType);
+    }
+  }
+
+  public CompressionContext(Class<? extends Dictionary> dictType, boolean 
recoveredEdits,
+      boolean hasTagCompression)
+      throws SecurityException, NoSuchMethodException, InstantiationException,
+        IllegalAccessException, InvocationTargetException, IOException {
+    this(dictType, recoveredEdits, hasTagCompression, false, null);
+  }
+
+  public boolean hasTagCompression() {
+    return tagCompressionContext != null;
+  }
+
+  public boolean hasValueCompression() {
+    return valueCompressor != null;
   }
 
-  public Dictionary getDictionary(Enum dictIndex) {
+  public Dictionary getDictionary(Enum<DictionaryIndex> dictIndex) {

Review comment:
       Drop `Enum<...>` and make this simply `Dictionary 
getDictionary(DictionaryIndex dictIndex)`

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
##########
@@ -18,37 +18,117 @@
 
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.EnumMap;
 import java.util.Map;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.io.DelegatingInputStream;
 import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Context that holds the various dictionaries for compression in WAL.
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, 
HBaseInterfaceAudience.PHOENIX})
 public class CompressionContext {
 
-  static final String ENABLE_WAL_TAGS_COMPRESSION =
-      "hbase.regionserver.wal.tags.enablecompression";
+  public static final String ENABLE_WAL_TAGS_COMPRESSION =
+    "hbase.regionserver.wal.tags.enablecompression";
+
+  public static final String ENABLE_WAL_VALUE_COMPRESSION =
+    "hbase.regionserver.wal.value.enablecompression";
+
+  public static final String WAL_VALUE_COMPRESSION_TYPE =
+    "hbase.regionserver.wal.value.compression.type";
 
   public enum DictionaryIndex {
     REGION, TABLE, FAMILY, QUALIFIER, ROW
   }
 
+  /**
+   * Encapsulates the compression algorithm and its streams that we will use 
for value
+   * compression in this WAL.
+   */
+  static class ValueCompressor {
+  
+    static final int IO_BUFFER_SIZE = 4096;
+
+    private final Compression.Algorithm algorithm;
+    private DelegatingInputStream lowerIn;
+    private ByteArrayOutputStream lowerOut;
+    private InputStream compressedIn;
+    private OutputStream compressedOut;
+
+    public ValueCompressor(Compression.Algorithm algorithm) throws IOException 
{

Review comment:
       `IOException` isn't thrown.

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;

Review comment:
       Should be in `o.a.h.h.regionserver.wal` ?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
##########
@@ -18,37 +18,117 @@
 
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.EnumMap;
 import java.util.Map;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.io.DelegatingInputStream;
 import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Context that holds the various dictionaries for compression in WAL.
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, 
HBaseInterfaceAudience.PHOENIX})
 public class CompressionContext {
 
-  static final String ENABLE_WAL_TAGS_COMPRESSION =
-      "hbase.regionserver.wal.tags.enablecompression";
+  public static final String ENABLE_WAL_TAGS_COMPRESSION =
+    "hbase.regionserver.wal.tags.enablecompression";
+
+  public static final String ENABLE_WAL_VALUE_COMPRESSION =
+    "hbase.regionserver.wal.value.enablecompression";
+
+  public static final String WAL_VALUE_COMPRESSION_TYPE =
+    "hbase.regionserver.wal.value.compression.type";
 
   public enum DictionaryIndex {
     REGION, TABLE, FAMILY, QUALIFIER, ROW
   }
 
+  /**
+   * Encapsulates the compression algorithm and its streams that we will use 
for value
+   * compression in this WAL.
+   */
+  static class ValueCompressor {
+  
+    static final int IO_BUFFER_SIZE = 4096;
+
+    private final Compression.Algorithm algorithm;
+    private DelegatingInputStream lowerIn;
+    private ByteArrayOutputStream lowerOut;
+    private InputStream compressedIn;
+    private OutputStream compressedOut;
+
+    public ValueCompressor(Compression.Algorithm algorithm) throws IOException 
{
+      this.algorithm = algorithm;
+    }
+
+    public Compression.Algorithm getAlgorithm() {
+      return algorithm;
+    }
+
+    public byte[] compress(byte[] valueArray, int valueOffset, int valueLength)
+        throws IOException {
+      // We have to create the output streams here the first time around.
+      if (compressedOut == null) {
+        lowerOut = new ByteArrayOutputStream();
+        compressedOut = algorithm.createCompressionStream(lowerOut, 
algorithm.getCompressor(),
+          IO_BUFFER_SIZE);
+      } else {
+        lowerOut.reset();
+      }
+      compressedOut.write(valueArray, valueOffset, valueLength);
+      compressedOut.flush();
+      return lowerOut.toByteArray();
+    }
+
+    public int decompress(InputStream in, int inLength, byte[] outArray, int 
outOffset,
+        int outLength) throws IOException {
+      // Read all of the compressed bytes into a buffer.
+      byte[] inBuffer = new byte[inLength];
+      IOUtils.readFully(in, inBuffer);
+      // We have to create the input streams here the first time around.
+      if (compressedIn == null) {
+        lowerIn = new DelegatingInputStream(new 
ByteArrayInputStream(inBuffer));
+        compressedIn = algorithm.createDecompressionStream(lowerIn, 
algorithm.getDecompressor(),
+          IO_BUFFER_SIZE);
+      } else {
+        lowerIn.setDelegate(new ByteArrayInputStream(inBuffer));
+      }
+      return compressedIn.read(outArray, outOffset, outLength);
+    }
+
+    public void clear() {
+      lowerIn = null;
+      compressedIn = null;
+      lowerOut = null;
+      compressedOut = null;
+    }
+
+  };

Review comment:
       unnecessary `;`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to