Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b3cfae4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b3cfae4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b3cfae4

Branch: refs/heads/cassandra-3.0
Commit: 0b3cfae4e619d1ece5ff8afc774eeb52b93166d8
Parents: 9fe790d 056055f
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:58:58 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:58:58 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 ++++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compression/CompressedInputStreamTest.java  | 88 ++++++++++++++++----
 4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4182cc1,d11be26..608d8f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -11,7 -3,16 +11,8 @@@ Merged from 2.2
   * Fix SimpleDateType type compatibility (CASSANDRA-10027)
   * (Hadoop) fix splits calculation (CASSANDRA-10640)
   * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 - * (cqlsh) show partial trace if incomplete after max_trace_wait 
(CASSANDRA-7645)
 - * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 - * Expose phi values from failure detector via JMX and tweak debug
 -   and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
   * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
   * Try next replica if not possible to connect to primary replica on
     ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index ccd0ac5,daa339a..56dc63a
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -42,10 -41,9 +42,10 @@@ public class CompressedInputStream exte
      private final CompressionInfo info;
      // chunk buffer
      private final BlockingQueue<byte[]> dataBuffer;
 +    private final Supplier<Double> crcCheckChanceSupplier;
  
      // uncompressed bytes
-     private byte[] buffer;
+     private final byte[] buffer;
  
      // offset from the beginning of the buffer
      protected long bufferOffset = 0;
@@@ -67,16 -67,16 +69,17 @@@
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info)
 +    public CompressedInputStream(InputStream source, CompressionInfo info, 
ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
      {
          this.info = info;
 -        this.checksum =  new Adler32();
 +        this.checksum =  checksumType.newInstance();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
-         this.dataBuffer = new 
ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+         this.dataBuffer = new 
ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 +        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
  
-         new Thread(new Reader(source, info, dataBuffer)).start();
+         readerThread = new Thread(new Reader(source, info, dataBuffer));
+         readerThread.start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 5646592,0000000..2162e32
mode 100644,000000..100644
--- 
a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,129 -1,0 +1,183 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compression;
 +
 +import java.io.*;
 +import java.util.*;
++import java.util.concurrent.SynchronousQueue;
++import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +import org.apache.cassandra.db.ClusteringComparator;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
 +import org.apache.cassandra.streaming.compress.CompressionInfo;
 +import org.apache.cassandra.utils.ChecksumType;
 +import org.apache.cassandra.utils.Pair;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +/**
 + */
 +public class CompressedInputStreamTest
 +{
 +    @Test
 +    public void testCompressedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{0L}, false);
 +        testCompressedReadWith(new long[]{1L}, false);
 +        testCompressedReadWith(new long[]{100L}, false);
 +
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
 +    }
 +
 +    @Test(expected = EOFException.class)
 +    public void testTruncatedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
 +    }
++
++    /**
++     * Test CompressedInputStream not hang when closed while reading
++     * @throws IOException
++     */
++    @Test(expected = EOFException.class)
++    public void testClose() throws IOException
++    {
++        CompressionParams param = CompressionParams.snappy(32);
++        CompressionMetadata.Chunk[] chunks = {new 
CompressionMetadata.Chunk(0, 100)};
++        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
++        InputStream blockingInput = new InputStream()
++        {
++            @Override
++            public int read() throws IOException
++            {
++                try
++                {
++                    // 10 second cut off not to stop other test in case
++                    return Objects.requireNonNull(blocker.poll(10, 
TimeUnit.SECONDS));
++                }
++                catch (InterruptedException e)
++                {
++                    throw new IOException("Interrupted as expected", e);
++                }
++            }
++        };
++        CompressionInfo info = new CompressionInfo(chunks, param);
++        try (CompressedInputStream cis = new 
CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
++        {
++            new Thread(new Runnable()
++            {
++                @Override
++                public void run()
++                {
++                    try
++                    {
++                        cis.close();
++                    }
++                    catch (Exception ignore) {}
++                }
++            }).start();
++            // block here
++            cis.read();
++        }
++    }
++
 +    /**
 +     * @param valuesToCheck array of longs of range(0-999)
 +     * @throws Exception
 +     */
 +    private void testCompressedReadWith(long[] valuesToCheck, boolean 
testTruncate) throws Exception
 +    {
 +        assert valuesToCheck != null && valuesToCheck.length > 0;
 +
 +        // write compressed data file of longs
 +        File tmp = new File(File.createTempFile("cassandra", 
"unittest").getParent(), "ks-cf-ib-1-Data.db");
 +        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
 +        MetadataCollector collector = new MetadataCollector(new 
ClusteringComparator(BytesType.instance));
 +        CompressionParams param = CompressionParams.snappy(32);
-         CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
param, collector);
 +        Map<Long, Long> index = new HashMap<Long, Long>();
-         for (long l = 0L; l < 1000; l++)
++        try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
param, collector))
 +        {
-             index.put(l, writer.position());
-             writer.writeLong(l);
++            for (long l = 0L; l < 1000; l++)
++            {
++                index.put(l, writer.position());
++                writer.writeLong(l);
++            }
++            writer.finish();
 +        }
-         writer.finish();
 +
 +        CompressionMetadata comp = 
CompressionMetadata.create(tmp.getAbsolutePath());
-         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
++        List<Pair<Long, Long>> sections = new ArrayList<>();
 +        for (long l : valuesToCheck)
 +        {
 +            long position = index.get(l);
 +            sections.add(Pair.create(position, position + 8));
 +        }
 +        CompressionMetadata.Chunk[] chunks = 
comp.getChunksForSections(sections);
 +        long totalSize = comp.getTotalSizeForSections(sections);
 +        long expectedSize = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            expectedSize += c.length + 4;
 +        assertEquals(expectedSize, totalSize);
 +
 +        // buffer up only relevant parts of file
 +        int size = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            size += (c.length + 4); // 4bytes CRC
 +        byte[] toRead = new byte[size];
 +
-         RandomAccessFile f = new RandomAccessFile(tmp, "r");
-         int pos = 0;
-         for (CompressionMetadata.Chunk c : chunks)
++        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
 +        {
-             f.seek(c.offset);
-             pos += f.read(toRead, pos, c.length + 4);
++            int pos = 0;
++            for (CompressionMetadata.Chunk c : chunks)
++            {
++                f.seek(c.offset);
++                pos += f.read(toRead, pos, c.length + 4);
++            }
 +        }
-         f.close();
 +
 +        if (testTruncate)
 +        {
 +            byte [] actuallyRead = new byte[50];
 +            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
 +            toRead = actuallyRead;
 +        }
 +
 +        // read buffer using CompressedInputStream
 +        CompressionInfo info = new CompressionInfo(chunks, param);
 +        CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(toRead), info,
 +                                                                
ChecksumType.CRC32, () -> 1.0);
-         DataInputStream in = new DataInputStream(input);
 +
-         for (int i = 0; i < sections.size(); i++)
++        try (DataInputStream in = new DataInputStream(input))
 +        {
-             input.position(sections.get(i).left);
-             long readValue = in.readLong();
-             assert readValue == valuesToCheck[i] : "expected " + 
valuesToCheck[i] + " but was " + readValue;
++            for (int i = 0; i < sections.size(); i++)
++            {
++                input.position(sections.get(i).left);
++                long readValue = in.readLong();
++                assertEquals("expected " + valuesToCheck[i] + " but was " + 
readValue, valuesToCheck[i], readValue);
++            }
 +        }
 +    }
 +}

Reply via email to