Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b3cfae4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b3cfae4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b3cfae4 Branch: refs/heads/cassandra-3.0 Commit: 0b3cfae4e619d1ece5ff8afc774eeb52b93166d8 Parents: 9fe790d 056055f Author: Yuki Morishita <yu...@apache.org> Authored: Mon Nov 23 13:58:58 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Mon Nov 23 13:58:58 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compress/CompressedInputStream.java | 46 ++++++++-- .../compress/CompressedStreamReader.java | 4 + .../compression/CompressedInputStreamTest.java | 88 ++++++++++++++++---- 4 files changed, 114 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 4182cc1,d11be26..608d8f8 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -11,7 -3,16 +11,8 @@@ Merged from 2.2 * Fix SimpleDateType type compatibility (CASSANDRA-10027) * (Hadoop) fix splits calculation (CASSANDRA-10640) * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) - * Use most up-to-date version of schema for system tables (CASSANDRA-10652) - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) - * Expose phi values from failure detector via JMX and tweak debug - and trace logging (CASSANDRA-9526) - * Fix RangeNamesQueryPager (CASSANDRA-10509) - * Deprecate Pig support (CASSANDRA-10542) - * Reduce contention getting instances of CompositeType (CASSANDRA-10433) Merged from 2.1: + * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012) * (cqlsh) Support counters in COPY commands (CASSANDRA-9043) * Try next replica if not possible to connect to primary replica on ColumnFamilyRecordReader (CASSANDRA-2388) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index ccd0ac5,daa339a..56dc63a --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@@ -42,10 -41,9 +42,10 @@@ public class CompressedInputStream exte private final CompressionInfo info; // chunk buffer private final BlockingQueue<byte[]> dataBuffer; + private final Supplier<Double> crcCheckChanceSupplier; // uncompressed bytes - private byte[] buffer; + private final byte[] buffer; // offset from the beginning of the buffer protected long bufferOffset = 0; @@@ -67,16 -67,16 +69,17 @@@ * @param source Input source to read compressed data from * @param info Compression info */ - public CompressedInputStream(InputStream source, CompressionInfo info) + public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier) { this.info = info; - this.checksum = new Adler32(); + this.checksum = checksumType.newInstance(); this.buffer = new byte[info.parameters.chunkLength()]; // buffer is limited to store up to 1024 chunks - this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024)); + this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); + this.crcCheckChanceSupplier = crcCheckChanceSupplier; - new Thread(new Reader(source, info, dataBuffer)).start(); + readerThread = new Thread(new Reader(source, info, dataBuffer)); + readerThread.start(); } public int read() throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java index 5646592,0000000..2162e32 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java @@@ -1,129 -1,0 +1,183 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.compression; + +import java.io.*; +import java.util.*; ++import java.util.concurrent.SynchronousQueue; ++import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.streaming.compress.CompressedInputStream; +import org.apache.cassandra.streaming.compress.CompressionInfo; +import org.apache.cassandra.utils.ChecksumType; +import org.apache.cassandra.utils.Pair; + +import static org.junit.Assert.assertEquals; + +/** + */ +public class CompressedInputStreamTest +{ + @Test + public void testCompressedRead() throws Exception + { + testCompressedReadWith(new long[]{0L}, false); + testCompressedReadWith(new long[]{1L}, false); + testCompressedReadWith(new long[]{100L}, false); + + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false); + } + + @Test(expected = EOFException.class) + public void testTruncatedRead() throws Exception + { + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true); + } ++ ++ /** ++ * Test CompressedInputStream not hang when closed while reading ++ * @throws IOException ++ */ ++ @Test(expected = EOFException.class) ++ public void testClose() throws IOException ++ { ++ CompressionParams param = CompressionParams.snappy(32); ++ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)}; ++ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>(); ++ InputStream blockingInput = new InputStream() ++ { ++ @Override ++ public int read() throws IOException ++ { ++ try ++ { ++ // 10 second cut off not to stop other test in case ++ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS)); ++ } ++ catch (InterruptedException e) ++ { ++ throw new IOException("Interrupted as expected", e); ++ } ++ } ++ }; ++ CompressionInfo info = new CompressionInfo(chunks, param); ++ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0)) ++ { ++ new Thread(new Runnable() ++ { ++ @Override ++ public void run() ++ { ++ try ++ { ++ cis.close(); ++ } ++ catch (Exception ignore) {} ++ } ++ }).start(); ++ // block here ++ cis.read(); ++ } ++ } ++ + /** + * @param valuesToCheck array of longs of range(0-999) + * @throws Exception + */ + private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception + { + assert valuesToCheck != null && valuesToCheck.length > 0; + + // write compressed data file of longs + File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db"); + Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath()); + MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); + CompressionParams param = CompressionParams.snappy(32); - CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector); + Map<Long, Long> index = new HashMap<Long, Long>(); - for (long l = 0L; l < 1000; l++) ++ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector)) + { - index.put(l, writer.position()); - writer.writeLong(l); ++ for (long l = 0L; l < 1000; l++) ++ { ++ index.put(l, writer.position()); ++ writer.writeLong(l); ++ } ++ writer.finish(); + } - writer.finish(); + + CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath()); - List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>(); ++ List<Pair<Long, Long>> sections = new ArrayList<>(); + for (long l : valuesToCheck) + { + long position = index.get(l); + sections.add(Pair.create(position, position + 8)); + } + CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections); + long totalSize = comp.getTotalSizeForSections(sections); + long expectedSize = 0; + for (CompressionMetadata.Chunk c : chunks) + expectedSize += c.length + 4; + assertEquals(expectedSize, totalSize); + + // buffer up only relevant parts of file + int size = 0; + for (CompressionMetadata.Chunk c : chunks) + size += (c.length + 4); // 4bytes CRC + byte[] toRead = new byte[size]; + - RandomAccessFile f = new RandomAccessFile(tmp, "r"); - int pos = 0; - for (CompressionMetadata.Chunk c : chunks) ++ try (RandomAccessFile f = new RandomAccessFile(tmp, "r")) + { - f.seek(c.offset); - pos += f.read(toRead, pos, c.length + 4); ++ int pos = 0; ++ for (CompressionMetadata.Chunk c : chunks) ++ { ++ f.seek(c.offset); ++ pos += f.read(toRead, pos, c.length + 4); ++ } + } - f.close(); + + if (testTruncate) + { + byte [] actuallyRead = new byte[50]; + System.arraycopy(toRead, 0, actuallyRead, 0, 50); + toRead = actuallyRead; + } + + // read buffer using CompressedInputStream + CompressionInfo info = new CompressionInfo(chunks, param); + CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, + ChecksumType.CRC32, () -> 1.0); - DataInputStream in = new DataInputStream(input); + - for (int i = 0; i < sections.size(); i++) ++ try (DataInputStream in = new DataInputStream(input)) + { - input.position(sections.get(i).left); - long readValue = in.readLong(); - assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue; ++ for (int i = 0; i < sections.size(); i++) ++ { ++ input.position(sections.get(i).left); ++ long readValue = in.readLong(); ++ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue); ++ } + } + } +}