refactor Snappy to use the same decompress code as LZ4
Project: http://git-wip-us.apache.org/repos/asf/commons-compress/repo Commit: http://git-wip-us.apache.org/repos/asf/commons-compress/commit/a3620e04 Tree: http://git-wip-us.apache.org/repos/asf/commons-compress/tree/a3620e04 Diff: http://git-wip-us.apache.org/repos/asf/commons-compress/diff/a3620e04 Branch: refs/heads/master Commit: a3620e04e4187c3de15115ef33fb148e4fb29424 Parents: db53703 Author: Stefan Bodewig <[email protected]> Authored: Tue Jan 17 21:44:01 2017 +0100 Committer: Stefan Bodewig <[email protected]> Committed: Tue Jan 17 21:44:01 2017 +0100 ---------------------------------------------------------------------- .../snappy/SnappyCompressorInputStream.java | 259 ++++--------------- .../FramedSnappyCompressorInputStreamTest.java | 3 +- .../compressors/snappy/SnappyRoundtripTest.java | 2 + 3 files changed, 57 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/commons-compress/blob/a3620e04/src/main/java/org/apache/commons/compress/compressors/snappy/SnappyCompressorInputStream.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/commons/compress/compressors/snappy/SnappyCompressorInputStream.java b/src/main/java/org/apache/commons/compress/compressors/snappy/SnappyCompressorInputStream.java index 6b1f1d8..39b484f 100644 --- a/src/main/java/org/apache/commons/compress/compressors/snappy/SnappyCompressorInputStream.java +++ b/src/main/java/org/apache/commons/compress/compressors/snappy/SnappyCompressorInputStream.java @@ -21,7 +21,7 @@ package org.apache.commons.compress.compressors.snappy; import java.io.IOException; import java.io.InputStream; -import org.apache.commons.compress.compressors.CompressorInputStream; +import org.apache.commons.compress.compressors.lz77support.AbstractLZ77CompressorInputStream; import org.apache.commons.compress.utils.ByteUtils; import org.apache.commons.compress.utils.IOUtils; @@ -39,7 +39,7 @@ import org.apache.commons.compress.utils.IOUtils; * @see <a href="https://github.com/google/snappy/blob/master/format_description.txt">Snappy compressed format description</a> * @since 1.7 */ -public class SnappyCompressorInputStream extends CompressorInputStream { +public class SnappyCompressorInputStream extends AbstractLZ77CompressorInputStream { /** Mask used to determine the type of "tag" is being processed */ private static final int TAG_MASK = 0x03; @@ -47,39 +47,17 @@ public class SnappyCompressorInputStream extends CompressorInputStream { /** Default block size */ public static final int DEFAULT_BLOCK_SIZE = 32768; - /** Buffer to write decompressed bytes to for back-references */ - private final byte[] decompressBuf; - - /** One behind the index of the last byte in the buffer that was written */ - private int writeIndex; - - /** Index of the next byte to be read. */ - private int readIndex; - - /** The actual block size specified */ - private final int blockSize; - - /** The underlying stream to read compressed data from */ - private final InputStream in; - /** The size of the uncompressed data */ private final int size; /** Number of uncompressed bytes still to be read. */ private int uncompressedBytesRemaining; - // used in no-arg read method - private final byte[] oneByte = new byte[1]; + /** Current state of the stream */ + private State state = State.NO_BLOCK; private boolean endReached = false; - private final ByteUtils.ByteSupplier supplier = new ByteUtils.ByteSupplier() { - @Override - public int getAsByte() throws IOException { - return readOneByte(); - } - }; - /** * Constructor using the default buffer size of 32k. * @@ -104,31 +82,10 @@ public class SnappyCompressorInputStream extends CompressorInputStream { */ public SnappyCompressorInputStream(final InputStream is, final int blockSize) throws IOException { - this.in = is; - this.blockSize = blockSize; - this.decompressBuf = new byte[blockSize * 3]; - this.writeIndex = readIndex = 0; + super(is, blockSize); uncompressedBytesRemaining = size = (int) readSize(); } - /** {@inheritDoc} */ - @Override - public int read() throws IOException { - return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF; - } - - /** {@inheritDoc} */ - @Override - public void close() throws IOException { - in.close(); - } - - /** {@inheritDoc} */ - @Override - public int available() { - return writeIndex - readIndex; - } - /** * {@inheritDoc} */ @@ -137,21 +94,25 @@ public class SnappyCompressorInputStream extends CompressorInputStream { if (endReached) { return -1; } - final int avail = available(); - if (len > avail) { - fill(len - avail); - } - - final int readable = Math.min(len, available()); - if (readable == 0 && len > 0) { - return -1; - } - System.arraycopy(decompressBuf, readIndex, b, off, readable); - readIndex += readable; - if (readIndex > 2 * blockSize) { - slideBuffer(); + switch (state) { + case NO_BLOCK: + fill(len); + return read(b, off, len); + case IN_LITERAL: + int litLen = readLiteral(b, off, len); + if (!hasMoreDataInBlock()) { + state = State.NO_BLOCK; + } + return litLen; + case IN_COPY: + int copyLen = readCopy(b, off, len); + if (!hasMoreDataInBlock()) { + state = State.NO_BLOCK; + } + return copyLen; + default: + throw new IOException("Unknown stream state " + state); } - return readable; } /** @@ -163,13 +124,15 @@ public class SnappyCompressorInputStream extends CompressorInputStream { private void fill(final int len) throws IOException { if (uncompressedBytesRemaining == 0) { endReached = true; + return; } - int readNow = Math.min(len, uncompressedBytesRemaining); - while (readNow > 0) { - final int b = readOneByte(); + int b = readOneByte(); + if (b == -1) { + throw new IOException("Premature end of stream reading block start"); + } int length = 0; - long offset = 0; + int offset = 0; switch (b & TAG_MASK) { @@ -177,10 +140,8 @@ public class SnappyCompressorInputStream extends CompressorInputStream { length = readLiteralLength(b); uncompressedBytesRemaining -= length; - - if (expandLiteral(length)) { - return; - } + startLiteral(length); + state = State.IN_LITERAL; break; case 0x01: @@ -197,11 +158,14 @@ public class SnappyCompressorInputStream extends CompressorInputStream { length = 4 + ((b >> 2) & 0x07); uncompressedBytesRemaining -= length; offset = (b & 0xE0) << 3; - offset |= readOneByte(); - - if (expandCopy(offset, length)) { - return; + b = readOneByte(); + if (b == -1) { + throw new IOException("Premature end of stream reading copy length"); } + offset |= b; + + startCopy(offset, length); + state = State.IN_COPY; break; case 0x02: @@ -217,11 +181,10 @@ public class SnappyCompressorInputStream extends CompressorInputStream { length = (b >> 2) + 1; uncompressedBytesRemaining -= length; - offset = ByteUtils.fromLittleEndian(supplier, 2); + offset = (int) ByteUtils.fromLittleEndian(supplier, 2); - if (expandCopy(offset, length)) { - return; - } + startCopy(offset, length); + state = State.IN_COPY; break; case 0x03: @@ -236,32 +199,14 @@ public class SnappyCompressorInputStream extends CompressorInputStream { length = (b >> 2) + 1; uncompressedBytesRemaining -= length; - offset = ByteUtils.fromLittleEndian(supplier, 4); + offset = (int) ByteUtils.fromLittleEndian(supplier, 4) & 0x7fffffff; - if (expandCopy(offset, length)) { - return; - } + startCopy(offset, length); + state = State.IN_COPY; break; } - - readNow -= length; - } } - /** - * Slide buffer. - * - * <p>Move all bytes of the buffer after the first block down to - * the beginning of the buffer.</p> - */ - private void slideBuffer() { - System.arraycopy(decompressBuf, blockSize, decompressBuf, 0, - blockSize * 2); - writeIndex -= blockSize; - readIndex -= blockSize; - } - - /* * For literals up to and including 60 bytes in length, the * upper six bits of the tag byte contain (len-1). The literal @@ -277,6 +222,9 @@ public class SnappyCompressorInputStream extends CompressorInputStream { switch (b >> 2) { case 60: length = readOneByte(); + if (length == -1) { + throw new IOException("Premature end of stream reading literal length"); + } break; case 61: length = (int) ByteUtils.fromLittleEndian(supplier, 2); @@ -296,112 +244,6 @@ public class SnappyCompressorInputStream extends CompressorInputStream { } /** - * Literals are uncompressed data stored directly in the byte stream. - * - * @param length - * The number of bytes to read from the underlying stream - * - * @throws IOException - * If the first byte cannot be read for any reason other than - * end of file, or if the input stream has been closed, or if - * some other I/O error occurs. - * @return True if the decompressed data should be flushed - */ - private boolean expandLiteral(final int length) throws IOException { - boolean shouldFlush = ensureBufferSpace(length); - final int bytesRead = IOUtils.readFully(in, decompressBuf, writeIndex, length); - count(bytesRead); - if (length != bytesRead) { - throw new IOException("Premature end of stream"); - } - - writeIndex += length; - return shouldFlush || writeIndex >= 2 * this.blockSize; - } - - private boolean ensureBufferSpace(final int length) { - if (writeIndex + length >= decompressBuf.length) { - slideBuffer(); - return true; - } - return false; - } - - /** - * Copies are references back into previous decompressed data, telling the - * decompressor to reuse data it has previously decoded. They encode two - * values: The offset, saying how many bytes back from the current position - * to read, and the length, how many bytes to copy. Offsets of zero can be - * encoded, but are not legal; similarly, it is possible to encode - * backreferences that would go past the end of the block (offset > current - * decompressed position), which is also nonsensical and thus not allowed. - * - * @param off - * The offset from the backward from the end of expanded stream - * @param length - * The number of bytes to copy - * - * @throws IOException - * An the offset expands past the front of the decompression - * buffer - * @return True if the decompressed data should be flushed - */ - private boolean expandCopy(final long off, final int length) throws IOException { - if (off > blockSize) { - throw new IOException("Offset " + off + " is larger than block size " + blockSize); - } - final int offset = (int) off; - boolean shouldFlush = ensureBufferSpace(length); - - if (offset == 1) { - final byte lastChar = decompressBuf[writeIndex - 1]; - for (int i = 0; i < length; i++) { - decompressBuf[writeIndex++] = lastChar; - } - } else if (length < offset) { - System.arraycopy(decompressBuf, writeIndex - offset, - decompressBuf, writeIndex, length); - writeIndex += length; - } else { - int fullRotations = length / offset; - final int pad = length - (offset * fullRotations); - - while (fullRotations-- != 0) { - System.arraycopy(decompressBuf, writeIndex - offset, - decompressBuf, writeIndex, offset); - writeIndex += offset; - } - - if (pad > 0) { - System.arraycopy(decompressBuf, writeIndex - offset, - decompressBuf, writeIndex, pad); - - writeIndex += pad; - } - } - return shouldFlush || writeIndex >= 2 * this.blockSize; - } - - /** - * This helper method reads the next byte of data from the input stream. The - * value byte is returned as an <code>int</code> in the range <code>0</code> - * to <code>255</code>. If no byte is available because the end of the - * stream has been reached, an Exception is thrown. - * - * @return The next byte of data - * @throws IOException - * EOF is reached or error reading the stream - */ - private int readOneByte() throws IOException { - final int b = in.read(); - if (b == -1) { - throw new IOException("Premature end of stream"); - } - count(1); - return b & 0xFF; - } - - /** * The stream starts with the uncompressed length (up to a maximum of 2^32 - * 1), stored as a little-endian varint. Varints consist of a series of * bytes, where the lower 7 bits are data and the upper bit is set iff there @@ -421,6 +263,9 @@ public class SnappyCompressorInputStream extends CompressorInputStream { do { b = readOneByte(); + if (b == -1) { + throw new IOException("Premature end of stream reading size"); + } sz |= (b & 0x7f) << (index++ * 7); } while (0 != (b & 0x80)); return sz; @@ -431,8 +276,12 @@ public class SnappyCompressorInputStream extends CompressorInputStream { * * @return the uncompressed size */ + @Override public int getSize() { return size; } + private enum State { + NO_BLOCK, IN_LITERAL, IN_COPY + } } http://git-wip-us.apache.org/repos/asf/commons-compress/blob/a3620e04/src/test/java/org/apache/commons/compress/compressors/snappy/FramedSnappyCompressorInputStreamTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/commons/compress/compressors/snappy/FramedSnappyCompressorInputStreamTest.java b/src/test/java/org/apache/commons/compress/compressors/snappy/FramedSnappyCompressorInputStreamTest.java index 0f07041..cac4b2c 100644 --- a/src/test/java/org/apache/commons/compress/compressors/snappy/FramedSnappyCompressorInputStreamTest.java +++ b/src/test/java/org/apache/commons/compress/compressors/snappy/FramedSnappyCompressorInputStreamTest.java @@ -119,10 +119,9 @@ public final class FramedSnappyCompressorInputStreamTest assertEquals(3, in.available()); // remainder of first uncompressed block assertEquals(3, in.read(new byte[5], 0, 3)); assertEquals('5', in.read()); - assertEquals(4, in.available()); // remainder of literal + assertEquals(0, in.available()); // end of chunk, must read next one assertEquals(4, in.read(new byte[5], 0, 4)); assertEquals('5', in.read()); - assertEquals(19, in.available()); // remainder of copy in.close(); } } http://git-wip-us.apache.org/repos/asf/commons-compress/blob/a3620e04/src/test/java/org/apache/commons/compress/compressors/snappy/SnappyRoundtripTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/commons/compress/compressors/snappy/SnappyRoundtripTest.java b/src/test/java/org/apache/commons/compress/compressors/snappy/SnappyRoundtripTest.java index b626b17..481d07c 100644 --- a/src/test/java/org/apache/commons/compress/compressors/snappy/SnappyRoundtripTest.java +++ b/src/test/java/org/apache/commons/compress/compressors/snappy/SnappyRoundtripTest.java @@ -46,6 +46,7 @@ public final class SnappyRoundtripTest extends AbstractTestCase { } System.err.println(input.getName() + " written, uncompressed bytes: " + input.length() + ", compressed bytes: " + outputSz.length() + " after " + (System.currentTimeMillis() - start) + "ms"); + start = System.currentTimeMillis(); try (FileInputStream is = new FileInputStream(input); SnappyCompressorInputStream sis = new SnappyCompressorInputStream(new FileInputStream(outputSz), params.getWindowSize())) { @@ -53,6 +54,7 @@ public final class SnappyRoundtripTest extends AbstractTestCase { byte[] actual = IOUtils.toByteArray(sis); Assert.assertArrayEquals(expected, actual); } + System.err.println(outputSz.getName() + " read after " + (System.currentTimeMillis() - start) + "ms"); } // should yield decent compression
