refactor Snappy to use the same decompress code as LZ4

Project: http://git-wip-us.apache.org/repos/asf/commons-compress/repo
Commit: http://git-wip-us.apache.org/repos/asf/commons-compress/commit/a3620e04
Tree: http://git-wip-us.apache.org/repos/asf/commons-compress/tree/a3620e04
Diff: http://git-wip-us.apache.org/repos/asf/commons-compress/diff/a3620e04

Branch: refs/heads/master
Commit: a3620e04e4187c3de15115ef33fb148e4fb29424
Parents: db53703
Author: Stefan Bodewig <[email protected]>
Authored: Tue Jan 17 21:44:01 2017 +0100
Committer: Stefan Bodewig <[email protected]>
Committed: Tue Jan 17 21:44:01 2017 +0100

----------------------------------------------------------------------
 .../snappy/SnappyCompressorInputStream.java     | 259 ++++---------------
 .../FramedSnappyCompressorInputStreamTest.java  |   3 +-
 .../compressors/snappy/SnappyRoundtripTest.java |   2 +
 3 files changed, 57 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/commons-compress/blob/a3620e04/src/main/java/org/apache/commons/compress/compressors/snappy/SnappyCompressorInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/commons/compress/compressors/snappy/SnappyCompressorInputStream.java
 
b/src/main/java/org/apache/commons/compress/compressors/snappy/SnappyCompressorInputStream.java
index 6b1f1d8..39b484f 100644
--- 
a/src/main/java/org/apache/commons/compress/compressors/snappy/SnappyCompressorInputStream.java
+++ 
b/src/main/java/org/apache/commons/compress/compressors/snappy/SnappyCompressorInputStream.java
@@ -21,7 +21,7 @@ package org.apache.commons.compress.compressors.snappy;
 import java.io.IOException;
 import java.io.InputStream;
 
-import org.apache.commons.compress.compressors.CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz77support.AbstractLZ77CompressorInputStream;
 import org.apache.commons.compress.utils.ByteUtils;
 import org.apache.commons.compress.utils.IOUtils;
 
@@ -39,7 +39,7 @@ import org.apache.commons.compress.utils.IOUtils;
  * @see <a 
href="https://github.com/google/snappy/blob/master/format_description.txt";>Snappy
 compressed format description</a>
  * @since 1.7
  */
-public class SnappyCompressorInputStream extends CompressorInputStream {
+public class SnappyCompressorInputStream extends 
AbstractLZ77CompressorInputStream {
 
     /** Mask used to determine the type of "tag" is being processed */
     private static final int TAG_MASK = 0x03;
@@ -47,39 +47,17 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
     /** Default block size */
     public static final int DEFAULT_BLOCK_SIZE = 32768;
 
-    /** Buffer to write decompressed bytes to for back-references */
-    private final byte[] decompressBuf;
-
-    /** One behind the index of the last byte in the buffer that was written */
-    private int writeIndex;
-
-    /** Index of the next byte to be read. */
-    private int readIndex;
-
-    /** The actual block size specified */
-    private final int blockSize;
-
-    /** The underlying stream to read compressed data from */
-    private final InputStream in;
-
     /** The size of the uncompressed data */
     private final int size;
 
     /** Number of uncompressed bytes still to be read. */
     private int uncompressedBytesRemaining;
 
-    // used in no-arg read method
-    private final byte[] oneByte = new byte[1];
+    /** Current state of the stream */
+    private State state = State.NO_BLOCK;
 
     private boolean endReached = false;
 
-    private final ByteUtils.ByteSupplier supplier = new 
ByteUtils.ByteSupplier() {
-        @Override
-        public int getAsByte() throws IOException {
-            return readOneByte();
-        }
-    };
-
     /**
      * Constructor using the default buffer size of 32k.
      * 
@@ -104,31 +82,10 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
      */
     public SnappyCompressorInputStream(final InputStream is, final int 
blockSize)
             throws IOException {
-        this.in = is;
-        this.blockSize = blockSize;
-        this.decompressBuf = new byte[blockSize * 3];
-        this.writeIndex = readIndex = 0;
+        super(is, blockSize);
         uncompressedBytesRemaining = size = (int) readSize();
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public int read() throws IOException {
-        return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void close() throws IOException {
-        in.close();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int available() {
-        return writeIndex - readIndex;
-    }
-
     /**
      * {@inheritDoc}
      */
@@ -137,21 +94,25 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
         if (endReached) {
             return -1;
         }
-        final int avail = available();
-        if (len > avail) {
-            fill(len - avail);
-        }
-
-        final int readable = Math.min(len, available());
-        if (readable == 0 && len > 0) {
-            return -1;
-        }
-        System.arraycopy(decompressBuf, readIndex, b, off, readable);
-        readIndex += readable;
-        if (readIndex > 2 * blockSize) {
-            slideBuffer();
+        switch (state) {
+        case NO_BLOCK:
+            fill(len);
+            return read(b, off, len);
+        case IN_LITERAL:
+            int litLen = readLiteral(b, off, len);
+            if (!hasMoreDataInBlock()) {
+                state = State.NO_BLOCK;
+            }
+            return litLen;
+        case IN_COPY:
+            int copyLen = readCopy(b, off, len);
+            if (!hasMoreDataInBlock()) {
+                state = State.NO_BLOCK;
+            }
+            return copyLen;
+        default:
+            throw new IOException("Unknown stream state " + state);
         }
-        return readable;
     }
 
     /**
@@ -163,13 +124,15 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
     private void fill(final int len) throws IOException {
         if (uncompressedBytesRemaining == 0) {
             endReached = true;
+            return;
         }
-        int readNow = Math.min(len, uncompressedBytesRemaining);
 
-        while (readNow > 0) {
-            final int b = readOneByte();
+            int b = readOneByte();
+            if (b == -1) {
+                throw new IOException("Premature end of stream reading block 
start");
+            }
             int length = 0;
-            long offset = 0;
+            int offset = 0;
 
             switch (b & TAG_MASK) {
 
@@ -177,10 +140,8 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
 
                 length = readLiteralLength(b);
                 uncompressedBytesRemaining -= length;
-
-                if (expandLiteral(length)) {
-                    return;
-                }
+                startLiteral(length);
+                state = State.IN_LITERAL;
                 break;
 
             case 0x01:
@@ -197,11 +158,14 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
                 length = 4 + ((b >> 2) & 0x07);
                 uncompressedBytesRemaining -= length;
                 offset = (b & 0xE0) << 3;
-                offset |= readOneByte();
-
-                if (expandCopy(offset, length)) {
-                    return;
+                b = readOneByte();
+                if (b == -1) {
+                    throw new IOException("Premature end of stream reading 
copy length");
                 }
+                offset |= b;
+
+                startCopy(offset, length);
+                state = State.IN_COPY;
                 break;
 
             case 0x02:
@@ -217,11 +181,10 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
                 length = (b >> 2) + 1;
                 uncompressedBytesRemaining -= length;
 
-                offset = ByteUtils.fromLittleEndian(supplier, 2);
+                offset = (int) ByteUtils.fromLittleEndian(supplier, 2);
 
-                if (expandCopy(offset, length)) {
-                    return;
-                }
+                startCopy(offset, length);
+                state = State.IN_COPY;
                 break;
 
             case 0x03:
@@ -236,32 +199,14 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
                 length = (b >> 2) + 1;
                 uncompressedBytesRemaining -= length;
 
-                offset = ByteUtils.fromLittleEndian(supplier, 4);
+                offset = (int) ByteUtils.fromLittleEndian(supplier, 4) & 
0x7fffffff;
 
-                if (expandCopy(offset, length)) {
-                    return;
-                }
+                startCopy(offset, length);
+                state = State.IN_COPY;
                 break;
             }
-
-            readNow -= length;
-        }
     }
 
-    /**
-     * Slide buffer.
-     *
-     * <p>Move all bytes of the buffer after the first block down to
-     * the beginning of the buffer.</p>
-     */
-    private void slideBuffer() {
-        System.arraycopy(decompressBuf, blockSize, decompressBuf, 0,
-                         blockSize * 2);
-        writeIndex -= blockSize;
-        readIndex -= blockSize;
-    }
-
-
     /*
      * For literals up to and including 60 bytes in length, the
      * upper six bits of the tag byte contain (len-1). The literal
@@ -277,6 +222,9 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
         switch (b >> 2) {
         case 60:
             length = readOneByte();
+            if (length == -1) {
+                throw new IOException("Premature end of stream reading literal 
length");
+            }
             break;
         case 61:
             length = (int) ByteUtils.fromLittleEndian(supplier, 2);
@@ -296,112 +244,6 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
     }
 
     /**
-     * Literals are uncompressed data stored directly in the byte stream.
-     * 
-     * @param length
-     *            The number of bytes to read from the underlying stream
-     * 
-     * @throws IOException
-     *             If the first byte cannot be read for any reason other than
-     *             end of file, or if the input stream has been closed, or if
-     *             some other I/O error occurs.
-     * @return True if the decompressed data should be flushed
-     */
-    private boolean expandLiteral(final int length) throws IOException {
-        boolean shouldFlush = ensureBufferSpace(length);
-        final int bytesRead = IOUtils.readFully(in, decompressBuf, writeIndex, 
length);
-        count(bytesRead);
-        if (length != bytesRead) {
-            throw new IOException("Premature end of stream");
-        }
-
-        writeIndex += length;
-        return shouldFlush || writeIndex >= 2 * this.blockSize;
-    }
-
-    private boolean ensureBufferSpace(final int length) {
-        if (writeIndex + length >= decompressBuf.length) {
-            slideBuffer();
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Copies are references back into previous decompressed data, telling the
-     * decompressor to reuse data it has previously decoded. They encode two
-     * values: The offset, saying how many bytes back from the current position
-     * to read, and the length, how many bytes to copy. Offsets of zero can be
-     * encoded, but are not legal; similarly, it is possible to encode
-     * backreferences that would go past the end of the block (offset > current
-     * decompressed position), which is also nonsensical and thus not allowed.
-     * 
-     * @param off
-     *            The offset from the backward from the end of expanded stream
-     * @param length
-     *            The number of bytes to copy
-     * 
-     * @throws IOException
-     *             An the offset expands past the front of the decompression
-     *             buffer
-     * @return True if the decompressed data should be flushed
-     */
-    private boolean expandCopy(final long off, final int length) throws 
IOException {
-        if (off > blockSize) {
-            throw new IOException("Offset " + off + " is larger than block 
size " + blockSize);
-        }
-        final int offset = (int) off;
-        boolean shouldFlush = ensureBufferSpace(length);
-
-        if (offset == 1) {
-            final byte lastChar = decompressBuf[writeIndex - 1];
-            for (int i = 0; i < length; i++) {
-                decompressBuf[writeIndex++] = lastChar;
-            }
-        } else if (length < offset) {
-            System.arraycopy(decompressBuf, writeIndex - offset,
-                    decompressBuf, writeIndex, length);
-            writeIndex += length;
-        } else {
-            int fullRotations = length / offset;
-            final int pad = length - (offset * fullRotations);
-
-            while (fullRotations-- != 0) {
-                System.arraycopy(decompressBuf, writeIndex - offset,
-                        decompressBuf, writeIndex, offset);
-                writeIndex += offset;
-            }
-
-            if (pad > 0) {
-                System.arraycopy(decompressBuf, writeIndex - offset,
-                        decompressBuf, writeIndex, pad);
-
-                writeIndex += pad;
-            }
-        }
-        return shouldFlush || writeIndex >= 2 * this.blockSize;
-    }
-
-    /**
-     * This helper method reads the next byte of data from the input stream. 
The
-     * value byte is returned as an <code>int</code> in the range 
<code>0</code>
-     * to <code>255</code>. If no byte is available because the end of the
-     * stream has been reached, an Exception is thrown.
-     * 
-     * @return The next byte of data
-     * @throws IOException
-     *             EOF is reached or error reading the stream
-     */
-    private int readOneByte() throws IOException {
-        final int b = in.read();
-        if (b == -1) {
-            throw new IOException("Premature end of stream");
-        }
-        count(1);
-        return b & 0xFF;
-    }
-
-    /**
      * The stream starts with the uncompressed length (up to a maximum of 2^32 
-
      * 1), stored as a little-endian varint. Varints consist of a series of
      * bytes, where the lower 7 bits are data and the upper bit is set iff 
there
@@ -421,6 +263,9 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
 
         do {
             b = readOneByte();
+            if (b == -1) {
+                throw new IOException("Premature end of stream reading size");
+            }
             sz |= (b & 0x7f) << (index++ * 7);
         } while (0 != (b & 0x80));
         return sz;
@@ -431,8 +276,12 @@ public class SnappyCompressorInputStream extends 
CompressorInputStream {
      * 
      * @return the uncompressed size
      */
+    @Override
     public int getSize() {
         return size;
     }
 
+    private enum State {
+        NO_BLOCK, IN_LITERAL, IN_COPY
+    }
 }

http://git-wip-us.apache.org/repos/asf/commons-compress/blob/a3620e04/src/test/java/org/apache/commons/compress/compressors/snappy/FramedSnappyCompressorInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/commons/compress/compressors/snappy/FramedSnappyCompressorInputStreamTest.java
 
b/src/test/java/org/apache/commons/compress/compressors/snappy/FramedSnappyCompressorInputStreamTest.java
index 0f07041..cac4b2c 100644
--- 
a/src/test/java/org/apache/commons/compress/compressors/snappy/FramedSnappyCompressorInputStreamTest.java
+++ 
b/src/test/java/org/apache/commons/compress/compressors/snappy/FramedSnappyCompressorInputStreamTest.java
@@ -119,10 +119,9 @@ public final class FramedSnappyCompressorInputStreamTest
             assertEquals(3, in.available()); // remainder of first 
uncompressed block
             assertEquals(3, in.read(new byte[5], 0, 3));
             assertEquals('5', in.read());
-            assertEquals(4, in.available()); // remainder of literal
+            assertEquals(0, in.available()); // end of chunk, must read next 
one
             assertEquals(4, in.read(new byte[5], 0, 4));
             assertEquals('5', in.read());
-            assertEquals(19, in.available()); // remainder of copy
             in.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/commons-compress/blob/a3620e04/src/test/java/org/apache/commons/compress/compressors/snappy/SnappyRoundtripTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/commons/compress/compressors/snappy/SnappyRoundtripTest.java
 
b/src/test/java/org/apache/commons/compress/compressors/snappy/SnappyRoundtripTest.java
index b626b17..481d07c 100644
--- 
a/src/test/java/org/apache/commons/compress/compressors/snappy/SnappyRoundtripTest.java
+++ 
b/src/test/java/org/apache/commons/compress/compressors/snappy/SnappyRoundtripTest.java
@@ -46,6 +46,7 @@ public final class SnappyRoundtripTest extends 
AbstractTestCase {
         }
         System.err.println(input.getName() + " written, uncompressed bytes: " 
+ input.length()
             + ", compressed bytes: " + outputSz.length() + " after " + 
(System.currentTimeMillis() - start) + "ms");
+        start = System.currentTimeMillis();
         try (FileInputStream is = new FileInputStream(input);
              SnappyCompressorInputStream sis = new 
SnappyCompressorInputStream(new FileInputStream(outputSz),
                  params.getWindowSize())) {
@@ -53,6 +54,7 @@ public final class SnappyRoundtripTest extends 
AbstractTestCase {
             byte[] actual = IOUtils.toByteArray(sis);
             Assert.assertArrayEquals(expected, actual);
         }
+        System.err.println(outputSz.getName() + " read after " + 
(System.currentTimeMillis() - start) + "ms");
     }
 
     // should yield decent compression

Reply via email to