Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 558bb7246 -> bd692ba74
Throw EOFException if we run out of chunks in compressed file Patch by marcuse; reviewed by yukim for CASSANDRA-7664. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/76adf0e1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/76adf0e1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/76adf0e1 Branch: refs/heads/cassandra-2.1 Commit: 76adf0e12ed91ee7c75164872202bff29a2ad7f4 Parents: ecf1bae Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Aug 19 11:45:57 2014 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Aug 20 08:21:43 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../compress/CompressedInputStream.java | 17 +++++++++++++-- .../compress/CompressedInputStreamTest.java | 23 +++++++++++++++----- 3 files changed, 35 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/76adf0e1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 304d9bf..c8f7591 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.0.10 + * Throw EOFException if we run out of chunks in compressed datafile + (CASSANDRA-7664) * Throw InvalidRequestException when queries contain relations on entire collection columns (CASSANDRA-7506) * Fix PRSI handling of CQL3 row markers for row cleanup (CASSANDRA-7787) http://git-wip-us.apache.org/repos/asf/cassandra/blob/76adf0e1/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 698c2fe..ef019c2 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -58,6 +58,8 @@ public class CompressedInputStream extends InputStream // raw checksum bytes private final byte[] checksumBytes = new byte[4]; + private static final byte[] POISON_PILL = new byte[0]; + private long totalCompressedBytesRead; private final boolean hasPostCompressionAdlerChecksums; @@ -83,7 +85,10 @@ public class CompressedInputStream extends InputStream { try { - decompress(dataBuffer.take()); + byte[] compressedWithCRC = dataBuffer.take(); + if (compressedWithCRC == POISON_PILL) + throw new EOFException("No chunk available"); + decompress(compressedWithCRC); } catch (InterruptedException e) { @@ -162,7 +167,15 @@ public class CompressedInputStream extends InputStream int bufferRead = 0; while (bufferRead < readLength) - bufferRead += source.read(compressedWithCRC, bufferRead, readLength - bufferRead); + { + int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead); + if (r < 0) + { + dataBuffer.put(POISON_PILL); + return; // throw exception where we consume dataBuffer + } + bufferRead += r; + } dataBuffer.put(compressedWithCRC); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/76adf0e1/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java index 027c84c..532b506 100644 --- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.streaming.compress; import java.io.ByteArrayInputStream; import java.io.DataInputStream; +import java.io.EOFException; import java.io.File; import java.io.RandomAccessFile; import java.util.*; @@ -42,18 +43,23 @@ public class CompressedInputStreamTest @Test public void testCompressedRead() throws Exception { - testCompressedReadWith(new long[]{0L}); - testCompressedReadWith(new long[]{1L}); - testCompressedReadWith(new long[]{100L}); + testCompressedReadWith(new long[]{0L}, false); + testCompressedReadWith(new long[]{1L}, false); + testCompressedReadWith(new long[]{100L}, false); - testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}); + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false); } + @Test(expected = EOFException.class) + public void testTruncatedRead() throws Exception + { + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true); + } /** * @param valuesToCheck array of longs of range(0-999) * @throws Exception */ - private void testCompressedReadWith(long[] valuesToCheck) throws Exception + private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception { assert valuesToCheck != null && valuesToCheck.length > 0; @@ -95,6 +101,13 @@ public class CompressedInputStreamTest } f.close(); + if (testTruncate) + { + byte [] actuallyRead = new byte[50]; + System.arraycopy(toRead, 0, actuallyRead, 0, 50); + toRead = actuallyRead; + } + // read buffer using CompressedInputStream CompressionInfo info = new CompressionInfo(chunks, param); CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);