bharathv commented on a change in pull request #3244:
URL: https://github.com/apache/hbase/pull/3244#discussion_r630410074
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -241,10 +245,17 @@ public void write(Cell cell) throws IOException {
compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
PrivateCellUtil.compressQualifier(out, cell,
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
- // Write timestamp, type and value as uncompressed.
+ // Write timestamp, type and value.
StreamUtils.writeLong(out, cell.getTimestamp());
- out.write(cell.getTypeByte());
- PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
+ byte type = cell.getTypeByte();
Review comment:
nit: remove local variable assignment
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -349,6 +424,42 @@ private static void checkLength(int len, int max) throws
IOException {
throw new IOException("Invalid length for compresesed portion of
keyvalue: " + len);
}
}
+
+ private void readCompressedValue(InputStream in, byte[] outArray, int
outOffset,
+ int expectedLength) throws IOException {
+ // Read the size of the compressed value. We serialized it as a vint32.
+ int compressedLength = StreamUtils.readRawVarint32(in);
+ // Read all of the compressed value into a buffer for the Inflater.
+ byte[] buffer = new byte[compressedLength];
+ IOUtils.readFully(in, buffer, 0, compressedLength);
+ // Inflate the compressed value. We know the uncompressed size.
Inflator#inflate will
+ // return nonzero for as long as some compressed input remains, and 0
when done.
+ Inflater inflater = compression.getValueCompressor().getInflater();
Review comment:
Same comment as above, use InflaterInputStream?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -302,18 +367,28 @@ protected Cell parseCell() throws IOException {
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
pos += elemLen;
- // timestamp, type and value
- int tsTypeValLen = length - pos;
+ // timestamp
+ long ts = StreamUtils.readLong(in);
+ pos = Bytes.putLong(backingArray, pos, ts);
+ // type and value
+ int typeValLen = length - pos;
if (tagsLength > 0) {
- tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+ typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+ }
+ byte type = (byte)in.read();
+ pos = Bytes.putByte(backingArray, pos, type);
+ int valLen = typeValLen - 1;
+ if (compression.hasValueCompression()) {
Review comment:
Same here on avoiding a branch, should we do that?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -256,6 +267,61 @@ public void write(Cell cell) throws IOException {
}
}
}
+
+ private byte[] compressValue(Cell cell) throws IOException {
+ Deflater deflater = compression.getValueCompressor().getDeflater();
+ if (cell instanceof ByteBufferExtendedCell) {
Review comment:
q: Is this special handling to avoid an array copy from the bytebuffer?
(rather than using Cell interface getValueArray() methods). I see this kinda
special handling in multiple places but don't have the context..
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -256,6 +267,61 @@ public void write(Cell cell) throws IOException {
}
}
}
+
+ private byte[] compressValue(Cell cell) throws IOException {
+ Deflater deflater = compression.getValueCompressor().getDeflater();
+ if (cell instanceof ByteBufferExtendedCell) {
+
deflater.setInput(((ByteBufferExtendedCell)cell).getValueByteBuffer().array(),
+ ((ByteBufferExtendedCell)cell).getValueByteBuffer().arrayOffset() +
+ ((ByteBufferExtendedCell)cell).getValuePosition(),
+ cell.getValueLength());
+ } else {
+ deflater.setInput(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength());
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ int bufferSize = 1024;
+ byte[] buffer = new byte[bufferSize];
+ // Deflater#deflate will return 0 only if more input is required. We
iterate until
+ // that condition is met, sending the content of 'buffer' to the output
stream at
+ // each step, until deflate returns 0. Then the compressor must be
flushed in order
+ // for all of the value's output to be written into the corresponding
edit. (Otherwise
+ // the compressor would carry over some of the output for this value
into the output
+ // of the next.) To flush the compressor we call deflate again using the
method option
+ // that allows us to specify the SYNC_FLUSH flag. The sync output will
be placed into
+ // the buffer. When flushing we iterate until there is no more output.
Then the flush
+ // is complete and the compressor is ready for more input.
+ int bytesOut;
Review comment:
Why not use DeflateInputStream for this? That hides most of the
complexity here for us..
```
ByteArrayOutputStream bos = new ByteArrayOutputStream(input.length);
DeflaterOutputStream stream = new DeflaterOutputStream(bos, deflater);
try {
stream.write(input);
stream.close();
} catch (IOException e) {
throw
}
return bos.toByteArray();
```
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -241,10 +245,17 @@ public void write(Cell cell) throws IOException {
compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
PrivateCellUtil.compressQualifier(out, cell,
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
- // Write timestamp, type and value as uncompressed.
+ // Write timestamp, type and value.
StreamUtils.writeLong(out, cell.getTimestamp());
- out.write(cell.getTypeByte());
- PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
+ byte type = cell.getTypeByte();
+ out.write(type);
+ if (compression.getValueCompressor() != null) {
Review comment:
Should we consider eliminating this branch since this is known at init
time and this code path is perf critical?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]